Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Move RequestServer out of networking process
Browse files Browse the repository at this point in the history
  • Loading branch information
cburgdorf committed May 22, 2019
1 parent 864a3ea commit efedf94
Show file tree
Hide file tree
Showing 32 changed files with 1,196 additions and 306 deletions.
1 change: 0 additions & 1 deletion p2p/tools/paragon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
ParagonPeer,
ParagonPeerFactory,
ParagonPeerPool,
ParagonPeerPoolEventServer,
)
from .helpers import ( # noqa: F401
get_directly_connected_streams,
Expand Down
12 changes: 0 additions & 12 deletions p2p/tools/paragon/events.py

This file was deleted.

30 changes: 2 additions & 28 deletions p2p/tools/paragon/peer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import (
Iterable,
)
from p2p.exceptions import (
PeerConnectionLost,
)
from p2p.peer import (
BasePeer,
BasePeerContext,
Expand All @@ -17,13 +14,10 @@
_DecodedMsgType,
)

from trinity.protocol.common.peer_pool_event_bus import (
PeerPoolEventServer,
from .proto import (
ParagonProtocol,
)

from .events import GetSumRequest
from .proto import ParagonProtocol


class ParagonPeer(BasePeer):
supported_sub_protocols = (ParagonProtocol,)
Expand Down Expand Up @@ -51,26 +45,6 @@ class ParagonPeerFactory(BasePeerFactory):
context: ParagonContext


class ParagonPeerPoolEventServer(PeerPoolEventServer[ParagonPeer]):
"""
A request handler to handle paragon specific requests to the peer pool.
"""

async def _run(self) -> None:
self.logger.debug("Running ParagonPeerPoolEventServer")
self.run_daemon_task(self.handle_get_sum_requests())
await super()._run()

async def handle_get_sum_requests(self) -> None:
async for req in self.wait_iter(self.event_bus.stream(GetSumRequest)):
try:
peer = self.get_peer(req.remote)
except PeerConnectionLost:
pass
else:
peer.sub_proto.send_get_sum(req.a, req.b)


class ParagonPeerPool(BasePeerPool):
peer_factory_class = ParagonPeerFactory
context: ParagonContext
Expand Down
25 changes: 21 additions & 4 deletions tests/core/integration_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
from eth.db.header import HeaderDB
from eth.vm.forks.byzantium import ByzantiumVM

from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.db.base import BaseAsyncDB
from trinity.db.eth1.chain import BaseAsyncChainDB
from trinity.db.eth1.header import BaseAsyncHeaderDB

from trinity.protocol.common.peer_pool_event_bus import (
PeerPoolEventServer,
)
from trinity.protocol.eth.servers import (
ETHRequestServer,
)

ZIPPED_FIXTURES_PATH = Path(__file__).parent.parent / 'integration' / 'fixtures'

Expand Down Expand Up @@ -174,16 +178,29 @@ def load_fixture_db(db_fixture, db_class=LevelDB):
yield db_class(Path(tmpdir) / db_fixture.value)


async def make_peer_pool_answer_event_bus_requests(event_bus, peer_pool, handler_type=None):
async def run_peer_pool_event_server(event_bus, peer_pool, handler_type=None):

handler_type = PeerPoolEventServer if handler_type is None else handler_type

peer_pool_event_bus_request_handler = handler_type(
event_server = handler_type(
event_bus,
peer_pool,
peer_pool.cancel_token
)
asyncio.ensure_future(peer_pool_event_bus_request_handler.run())
asyncio.ensure_future(event_server.run())
# Give event subscriptions a moment to propagate
await asyncio.sleep(0.01)
await peer_pool_event_bus_request_handler.events.started.wait()
await event_server.events.started.wait()
return event_server


async def run_request_server(event_bus, chaindb, server_type=None):
server_type = ETHRequestServer if server_type is None else server_type
request_server = server_type(
event_bus,
TO_NETWORKING_BROADCAST_CONFIG,
chaindb,
)
asyncio.ensure_future(request_server.run())
await request_server.events.started.wait()
return request_server
10 changes: 7 additions & 3 deletions tests/core/p2p-proto/bcc/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,19 @@ async def get_directly_linked_peers(request, event_loop, alice_chain_db, bob_cha
async def get_directly_linked_peers_in_peer_pools(request,
event_loop,
alice_chain_db,
bob_chain_db):
bob_chain_db,
alice_peer_pool_event_bus=None,
bob_peer_pool_event_bus=None):
alice, bob = await get_directly_linked_peers(
request,
event_loop,
alice_chain_db=alice_chain_db,
bob_chain_db=bob_chain_db,
)
alice_peer_pool = BCCPeerPool(alice.transport._private_key, alice.context)
bob_peer_pool = BCCPeerPool(bob.transport._private_key, bob.context)
alice_peer_pool = BCCPeerPool(
alice.transport._private_key, alice.context, event_bus=alice_peer_pool_event_bus)
bob_peer_pool = BCCPeerPool(
bob.transport._private_key, bob.context, event_bus=bob_peer_pool_event_bus)

asyncio.ensure_future(alice_peer_pool.run())
asyncio.ensure_future(bob_peer_pool.run())
Expand Down
32 changes: 23 additions & 9 deletions tests/core/p2p-proto/bcc/test_full_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

from eth2.beacon.types.blocks import BeaconBlock

from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG
from trinity.protocol.bcc.peer import BCCPeerPoolEventServer
from trinity.protocol.bcc.servers import BCCRequestServer

from trinity.sync.beacon.chain import BeaconChainSyncer

from tests.core.integration_test_helpers import (
run_peer_pool_event_server,
)
from .helpers import (
SERENITY_GENESIS_CONFIG,
get_directly_linked_peers_in_peer_pools,
Expand All @@ -29,6 +34,7 @@ def import_block(self, block):
async def get_sync_setup(
request,
event_loop,
event_bus,
alice_chain_db,
bob_chain_db,
genesis_config=SERENITY_GENESIS_CONFIG):
Expand All @@ -37,15 +43,19 @@ async def get_sync_setup(
event_loop,
alice_chain_db=alice_chain_db,
bob_chain_db=bob_chain_db,
bob_peer_pool_event_bus=event_bus,
)

bob_request_server = BCCRequestServer(bob.context.chain_db, bob_peer_pool)
bob_request_server = BCCRequestServer(
event_bus, TO_NETWORKING_BROADCAST_CONFIG, bob.context.chain_db)

alice_syncer = BeaconChainSyncer(
alice_chain_db,
alice_peer_pool,
NoopBlockImporter(),
genesis_config,
)
await run_peer_pool_event_server(event_bus, bob_peer_pool, handler_type=BCCPeerPoolEventServer)

asyncio.ensure_future(bob_request_server.run())
asyncio.ensure_future(alice_syncer.run())
Expand All @@ -59,13 +69,14 @@ def finalizer():


@pytest.mark.asyncio
async def test_sync_from_genesis(request, event_loop):
async def test_sync_from_genesis(request, event_loop, event_bus):
genesis = create_test_block()
bob_blocks = (genesis,) + create_branch(length=99, root=genesis)
alice_chain_db = await get_chain_db((genesis,))
bob_chain_db = await get_chain_db(bob_blocks)

alice_syncer = await get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db)
alice_syncer = await get_sync_setup(
request, event_loop, event_bus, alice_chain_db, bob_chain_db)

await alice_syncer.events.finished.wait()

Expand All @@ -80,14 +91,15 @@ async def test_sync_from_genesis(request, event_loop):


@pytest.mark.asyncio
async def test_sync_from_old_head(request, event_loop):
async def test_sync_from_old_head(request, event_loop, event_bus):
genesis = create_test_block()
alice_blocks = (genesis,) + create_branch(length=49, root=genesis)
bob_blocks = alice_blocks + create_branch(length=50, root=alice_blocks[-1])
alice_chain_db = await get_chain_db(alice_blocks)
bob_chain_db = await get_chain_db(bob_blocks)

alice_syncer = await get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db)
alice_syncer = await get_sync_setup(
request, event_loop, event_bus, alice_chain_db, bob_chain_db)

await alice_syncer.events.finished.wait()

Expand All @@ -103,14 +115,15 @@ async def test_sync_from_old_head(request, event_loop):


@pytest.mark.asyncio
async def test_reorg_sync(request, event_loop):
async def test_reorg_sync(request, event_loop, event_bus):
genesis = create_test_block()
alice_blocks = (genesis,) + create_branch(length=49, root=genesis, state_root=b"\x11" * 32)
bob_blocks = (genesis,) + create_branch(length=99, root=genesis, state_root=b"\x22" * 32)
alice_chain_db = await get_chain_db(alice_blocks)
bob_chain_db = await get_chain_db(bob_blocks)

alice_syncer = await get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db)
alice_syncer = await get_sync_setup(
request, event_loop, event_bus, alice_chain_db, bob_chain_db)

await alice_syncer.events.finished.wait()

Expand All @@ -126,14 +139,15 @@ async def test_reorg_sync(request, event_loop):


@pytest.mark.asyncio
async def test_sync_when_already_at_best_head(request, event_loop):
async def test_sync_when_already_at_best_head(request, event_loop, event_bus):
genesis = create_test_block()
alice_blocks = (genesis,) + create_branch(length=99, root=genesis, state_root=b"\x11" * 32)
bob_blocks = (genesis,) + create_branch(length=50, root=genesis, state_root=b"\x22" * 32)
alice_chain_db = await get_chain_db(alice_blocks)
bob_chain_db = await get_chain_db(bob_blocks)

alice_syncer = await get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db)
alice_syncer = await get_sync_setup(
request, event_loop, event_bus, alice_chain_db, bob_chain_db)

await alice_syncer.events.finished.wait()

Expand Down
Loading

0 comments on commit efedf94

Please sign in to comment.