-
Notifications
You must be signed in to change notification settings - Fork 146
Move RequestServer into isolated plugin #617
Move RequestServer into isolated plugin #617
Conversation
855e8e8
to
4f4e995
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks workable. Lots of boilerplate in some areas but I think that's a product of our current deficiencies in our need for independent peer classes rather than any issue with your design.
trinity/protocol/eth/peer.py
Outdated
peer.sub_proto.send_block_headers(ev.headers) | ||
|
||
async def handle_send_block_bodies_events(self) -> None: | ||
async for ev in self.wait_iter(self.event_bus.stream(SendBlockBodiesEvent)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These all look so similar. Maybe replace with
async def handle_event(self, StreamEventType, event_handler_fn):
async for ev in self.wait_iter(self.event_bus.stream(StreamEventType)):
try:
peer = self.get_peer(ev.remote)
except PeerConnectionLost:
pass
else:
event_handler_fn(peer, ev)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I cleaned this up a bit more. Not sure what you think about the lambdas. I know you are not a fan but I couldn't figure out an alternative that I was satisfied with.
74474c4
to
e52dd53
Compare
@pipermerriam This can be given a full review now. It is still blocked on the upcoming lahja release but other than that it has passed my own judgement :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick scan, but LGTM. If you don't get a 👍 soon enough, ping me and I can take a second pass to approve.
@@ -184,4 +188,18 @@ def load_fixture_db(db_fixture, db_class=LevelDB): | |||
peer_pool.cancel_token | |||
) | |||
asyncio.ensure_future(peer_pool_event_bus_request_handler.run()) | |||
# Give event subscriptions a moment to propagate | |||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is necessary. Why isn't the next await
enough?
@@ -87,6 +87,8 @@ def can_decode_json(potential): | |||
request_msg, | |||
event_loop): | |||
|
|||
# Give the event subscriptions a moment to propagate | |||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these sleeps seem like a code smell. If they are required for correctness in non-test code, then we will probably have related bugs sooner or later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've got some wait_for_XX_subscripiton
APIs coming down the pipe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check out the last commit. It gets rid of the sleeps but depends on ethereum/lahja#72 getting merged before.
Btw, I don't think that will bite us in real code. In general, the code should be written in a way that it can handle cases of the other end not being fully booted yet. E.g. we already have things like the PeerPool
reaching out to the discovery to get peers but then timing out on the first request if the discovery isn't yet up and running to answer it.
I think this is more an issue of the test cases where we wire things together manually and have strict requirements about the other side being fully up and running yet.
I know this answer is kinda fuzzy and it might be wishful thinking but I think we are safe 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to resolve all sleeps except this one with the existing wait_for
APIs that are already in lahja/master.
For this one, we can not wait for a specific event but would rather like for any event subscription to be propagated. This would be possible with the upcoming wait_for_x
APIs proposed in ethereum/lahja#74 but I would like to base this PR on top of lahja/master to land it sooner.
So I hope we can live with this one single sleep until we have figured out the best API to wait on subscription propagations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment with the proposed replacement code, to be used when the new API is merged/released?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The syntax isn't yet entirely finalized but I promise to come back to this and update it as soon as it lands!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking issue?
trinity/protocol/common/events.py
Outdated
@@ -41,11 +54,24 @@ def expected_response_type() -> Type[PeerCountResponse]: | |||
return PeerCountResponse | |||
|
|||
|
|||
class DisconnectPeerEvent(BaseEvent): | |||
class DisconnectPeerEvent(HasRemoteEvent): | |||
""" | |||
Event broadcasted when we want to disconnect from a peer | |||
""" | |||
|
|||
def __init__(self, remote: Node, reason: DisconnectReason) -> None: | |||
self.remote = remote |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this could also now be:
self.remote = remote | |
super().__init__(remote) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd advocate for dropping the HasRemoteEvent
and just doing it explicitely. It doesn't appear to save any lines of code and I'd argue it makes readability worse.
def __init__(self, remote: Node, other_arg: int):
super().__init__(remote)
self.other_arg = other_arg
# vs
def __init__(self, remote: Node, other_arg: int):
self.remote = remote
self.other_arg = other_arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need that event for the bound
in https://github.com/ethereum/trinity/pull/617/files#diff-8067591e62bc9b699e77ac5f8671eb6eR47
Not sure how we would write the run_event_daemon
otherwise? Maybe with a typing.Protocol
but that would mean it would work with anything that has a remote
property even if it isn't a BaseEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe HasRemoteEvent
turns into an abstract class with an abstract property remote
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately mypy
goes absolutely crazy with abstract properties. Even with the suggest solution of using type: ignore
I couldn't get it to work. python/mypy#1362
So, I've gone back to the non-abstract version and just properly using super().__init__(remote)
everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we do this in a number of places. What's the difference here?
eg~
trinity/trinity/extensibility/plugin.py
Lines 80 to 83 in 675c583
@property | |
@abstractmethod | |
def event_bus(self) -> TrinityEventBusEndpoint: | |
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I was doing something stupid. Updated it. It does feel a bit boilerplatey but could be worse...
await self.cancel_token.wait() | ||
self.run_daemon_task(self.handle_native_peer_messages()) | ||
|
||
await self.cancellation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Just in case anyone wants to review this, hold back, I'm testing some things out to fine tune this. |
c6e8303
to
95a9ad3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still a somewhat incomplete review but got through most of it.
return event_server | ||
|
||
|
||
async def run_request_server(event_bus, chaindb, server_type=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good candidate for an asynccontextmanager
to manage setup and teardown? Not sure if that helps. Maybe teardown happens fine but in the trio
world we'll have to convert it to use that pattern or provide this function with a handle to a nursery...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good in general. I just wonder how to deal with the rightward drift when you need multiple of them e.g.
async with run_event_server():
async with run_request_server():
async with run_foo():
pass
It would be nice to be able to say:
async with (
run_event_server(),
run_request_server(),
run_foo()
):
pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking for something like this? https://stackoverflow.com/a/1073814/8412986
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Planning to convert this to the asynccontextmanager
in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah, I had no idea this was actually possible! Thanks @carver for bringing it up!
This works just fine.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def foo():
try:
print("foo")
yield 1
finally:
print("cleaning foo")
@asynccontextmanager
async def bar():
try:
print("bar")
yield 2
finally:
print("cleaning bar")
async def meh():
async with foo(), bar():
print("wohoo")
asyncio.run(meh())
@@ -87,6 +87,8 @@ def can_decode_json(potential): | |||
request_msg, | |||
event_loop): | |||
|
|||
# Give the event subscriptions a moment to propagate | |||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've got some wait_for_XX_subscripiton
APIs coming down the pipe.
trinity/protocol/common/events.py
Outdated
@@ -41,11 +54,24 @@ def expected_response_type() -> Type[PeerCountResponse]: | |||
return PeerCountResponse | |||
|
|||
|
|||
class DisconnectPeerEvent(BaseEvent): | |||
class DisconnectPeerEvent(HasRemoteEvent): | |||
""" | |||
Event broadcasted when we want to disconnect from a peer | |||
""" | |||
|
|||
def __init__(self, remote: Node, reason: DisconnectReason) -> None: | |||
self.remote = remote |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd advocate for dropping the HasRemoteEvent
and just doing it explicitely. It doesn't appear to save any lines of code and I'd argue it makes readability worse.
def __init__(self, remote: Node, other_arg: int):
super().__init__(remote)
self.other_arg = other_arg
# vs
def __init__(self, remote: Node, other_arg: int):
self.remote = remote
self.other_arg = other_arg
(sorry for the too-soon review. I just saw your request to hold off) |
95a9ad3
to
ee44f6d
Compare
d03d603
to
5c6ed9e
Compare
@@ -29,7 +29,8 @@ | |||
"plyvel==1.0.5", | |||
PYEVM_DEPENDENCY, | |||
"web3==4.4.1", | |||
"lahja==0.12.0", | |||
"lahja@git+https://github.com/ethereum/lahja.git@22396a8fe79809357438e917e66a7101e6e3ac01", # noqa: E501 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carver @pipermerriam I wonder what's the best thing to do here. Ideally I'd like to get this change in sooner than later because it touches many files and especially the eth2 code changes on a such rapid pace (rightfully so 👏 ) that I have to resolve conflicts with every rebase.
So I could rebase lahja from master. However, I feel that it would just be a matter of some more days until we want to cut a new release soon after. So, if you agree that this PR could land soon, do you think it would be acceptable to keep lahja pinned to this git ref from current lahja/master just for some days?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's okay temporarily. As long as getting lahja released with the needed changes becomes top priority :). Don't want it to hold up the next trinity release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know what the plan is here but in case this got dropped a friendly reminder that master is still using this git revision.
5c6ed9e
to
c231dbf
Compare
setup.py
Outdated
@@ -29,7 +29,8 @@ | |||
"plyvel==1.0.5", | |||
PYEVM_DEPENDENCY, | |||
"web3==4.4.1", | |||
"lahja==0.12.0", | |||
"lahja@git+https://github.com/ethereum/lahja.git@22396a8fe79809357438e917e66a7101e6e3ac01", # noqa: E501 | |||
# "lahja==0.12.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just drop this comment. We'll know which version to bring back after releasing the new lahja. Also, I think it would be good practice to use >=0.12.0<0.13.0
, so that we can make bugfix patch releases on lahja without forcing a trinity patch release.
await peer_pool_event_bus_request_handler.events.started.wait() | ||
asyncio.ensure_future(event_server.run()) | ||
|
||
await event_server.events.started.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: These two lines are so common that we probably want a one-liner, like await event_server.launch()
that starts the BaseService
in the background, but doesn't return until it's initialized. Piper's got bigger ideas for the service refactor, so we'll probably delay that syntactic sugar until after a refactor.
return event_server | ||
|
||
|
||
async def run_request_server(event_bus, chaindb, server_type=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking for something like this? https://stackoverflow.com/a/1073814/8412986
return event_server | ||
|
||
|
||
async def run_request_server(event_bus, chaindb, server_type=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Planning to convert this to the asynccontextmanager
in this PR?
@@ -87,6 +87,8 @@ def can_decode_json(potential): | |||
request_msg, | |||
event_loop): | |||
|
|||
# Give the event subscriptions a moment to propagate | |||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment with the proposed replacement code, to be used when the new API is merged/released?
await self._handle_msg(remote, cmd, msg) | ||
except OperationCancelled: | ||
# Silently swallow OperationCancelled exceptions because otherwise they'll be caught | ||
# by the except below and treated as unexpected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a debug log just so we notice if this is happening unexpectedly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait? When would that be unexpected and how would I recognize that? Also notice, that this code did not diverge from master.
trinity/trinity/protocol/common/servers.py
Lines 58 to 70 in 675c583
async def _quiet_handle_msg( | |
self, | |
peer: BasePeer, | |
cmd: protocol.Command, | |
msg: protocol._DecodedMsgType) -> None: | |
try: | |
await self._handle_msg(peer, cmd, msg) | |
except OperationCancelled: | |
# Silently swallow OperationCancelled exceptions because otherwise they'll be caught | |
# by the except below and treated as unexpected. | |
pass | |
except Exception: | |
self.logger.exception("Unexpected error when processing msg from %s", peer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess it's fine for now. It will be handled differently after switching to trio
eventually, so we can handle it then. (To answer your question, we wouldn't recognize that anything was wrong in the code. We might recognize it by browsing the logs, and noticing an unusual number or location of the logs.)
pass | ||
|
||
|
||
class GetNodeDataEvent(PeerPoolMessageEvent): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc to make sure @lithp notices that the firehose request server will probably need some changes after this merges. (Like adding a new event)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the ping! I think what you're saying is that I should race to merge Firehose before this PR lands ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hahaha! This PR is like a black whole. Eventually it sucks in the entire code base...at this point my whole job will be about rebasing this PR until eternity 😅
trinity/protocol/eth/peer.py
Outdated
""" | ||
A ``ETHPeer`` that can be used from any process as a drop-in replacement for the actual | ||
peer that sits in the peer pool. Any action performed on the ``ETHProxyPeer`` is delegated | ||
to the actual peer in the pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment correct? There are a lot of methods on a peer. What about disconnect
, for example? I only see sub_proto
implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That comment is a bit ahead of time haha. In the follow up PR that also touches the sync, this becomes a proper BaseService
derived class and I also had a typing_extensions.Protocol
ETHPeerLike
that abstracted away the usage of the actual peer or the proxy peer.
I will adjust the comment for this PR so that it properly reflects the status quo.
else: | ||
self.logger.debug("%s msg from %s not implemented", cmd, peer) | ||
self.logger.debug("%s msg not handled yet, needs to be implemented", cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The peer info could theoretically be interesting (here and the eth server). If some peer is requesting unexpected/non-standard commands, it might be nice to identify the type of client that's running by scanning the logs for the peer hash and its related corresponding client string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice that the only reason we could run into this, is when we mistakenly add a subscription for an event type that we then forget to handle. In other words, if we see something here, it is always our own misconfiguration.
ba2cce3
to
7695de5
Compare
Ok, still missing some |
161e079
to
aff2f7d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you are debugging something, but it seems good to go. Feel free to re-ping me if the solution you end up with needs review, too.
aff2f7d
to
66c35b2
Compare
I think, the test that scans the shutdown for errors (which is currently a I marked the test as |
self._remote = remote | ||
self.nodes = nodes | ||
|
||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you can define these classes without inheriting from HasRemoteEvent
and set this property normally and still get away with telling mypy
that something takes a TypeVar('Something', bound=HasRemoteEvent)
and it will still just work)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if HasRemoteEvent
were a typing_extensions.Protocol
so that structural typing is used. But that would allow any kind of thing with a remote
property to be used here and not enforce proper BaseEvent
checking. Unfortunately mypy doesn't allow multiple bounds (yet?) so that we can not do something like TypeVar('something', bound=[Intersection[BaseEvent, SupportsRemote]])
) | ||
|
||
if isinstance(cmd, ignored_commands): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a log message at debug level so this doesn't get completely lost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice that this code just moved, we've had this construct before. But I will send a follow up PR to just forward these two events because we will need that anyway when we start moving the transaction pool into a BaseIsolatedPlugin
as well.
What was wrong?
It is our long term plan to have the RequestServer, the PeerPool and the Sync run in their dedicated isolated processes. The benefits are:
PeerPool
will be free from any CPU intensive work (e.g. block verification etc)py-spy
Currently, these three things are still together in one process that is so far know as the
networking
process.How was it fixed?
To-Do
q=is%3Aopen+is%3Apr+label%3A%22Release+Notes%22)
BCCRequestServer
via the same pluginsetup.py
Cute Animal Picture