diff --git a/setup.py b/setup.py index 5a464b71ea..74916c648f 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ "plyvel==1.0.5", PYEVM_DEPENDENCY, "web3==4.4.1", - "lahja@git+https://github.com/ethereum/lahja.git@5e7924748b90dd14874289cc0ca1ff75df8f733d", + "lahja@git+https://github.com/cburgdorf/lahja-1.git@5eb425362f660d1c44570f4b1d17fae470e24cf5", # noqa: E501 # "lahja==0.12.0", "termcolor>=1.1.0,<2.0.0", "uvloop==0.11.2;platform_system=='Linux' or platform_system=='Darwin' or platform_system=='FreeBSD'", # noqa: E501 diff --git a/tests/core/integration_test_helpers.py b/tests/core/integration_test_helpers.py index dcf72324ec..d40469bd35 100644 --- a/tests/core/integration_test_helpers.py +++ b/tests/core/integration_test_helpers.py @@ -188,8 +188,7 @@ async def run_peer_pool_event_server(event_bus, peer_pool, handler_type=None): peer_pool.cancel_token ) asyncio.ensure_future(event_server.run()) - # Give event subscriptions a moment to propagate - await asyncio.sleep(0.01) + await event_server.events.started.wait() return event_server diff --git a/tests/core/json-rpc/test_ipc.py b/tests/core/json-rpc/test_ipc.py index faf0565c21..d7b979f44f 100644 --- a/tests/core/json-rpc/test_ipc.py +++ b/tests/core/json-rpc/test_ipc.py @@ -14,6 +14,9 @@ to_bytes, to_hex, ) +from lahja import ( + BaseEvent, +) from trinity.nodes.events import ( NetworkIdRequest, @@ -85,10 +88,11 @@ def can_decode_json(potential): async def get_ipc_response( jsonrpc_ipc_pipe_path, request_msg, - event_loop): + event_loop, + event_bus): + + await event_bus.wait_until_all_connections_subscribed_to(BaseEvent) - # Give the event subscriptions a moment to propagate - await asyncio.sleep(0.01) assert wait_for(jsonrpc_ipc_pipe_path), "IPC server did not successfully start with IPC file" reader, writer = await asyncio.open_unix_connection(str(jsonrpc_ipc_pipe_path), loop=event_loop) @@ -218,8 +222,9 @@ async def test_ipc_requests( request_msg, expected, event_loop, + event_bus, ipc_server): - result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop) + result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop, event_bus) assert result == expected @@ -234,7 +239,7 @@ async def test_network_id_ipc_request( mock_request_response(NetworkIdRequest, NetworkIdResponse(1337))(event_bus)) request_msg = build_request('net_version') expected = {'result': '1337', 'id': 3, 'jsonrpc': '2.0'} - result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop) + result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop, event_bus) assert result == expected @@ -324,8 +329,9 @@ async def test_estimate_gas_on_ipc( request_msg, expected, event_loop, + event_bus, ipc_server): - result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop) + result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop, event_bus) assert result == expected @@ -352,8 +358,9 @@ async def test_eth_call_on_ipc( request_msg, expected, event_loop, + event_bus, ipc_server): - result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop) + result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop, event_bus) assert result == expected @@ -426,6 +433,7 @@ async def test_eth_call_with_contract_on_ipc( gas_price, event_loop, ipc_server, + event_bus, expected): function_selector = function_signature_to_4byte_selector(signature) transaction = { @@ -435,7 +443,7 @@ async def test_eth_call_with_contract_on_ipc( 'data': to_hex(function_selector), } request_msg = build_request('eth_call', params=[transaction, 'latest']) - result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop) + result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop, event_bus) assert result == expected @@ -472,7 +480,8 @@ async def test_peer_pool_over_ipc( result = await get_ipc_response( jsonrpc_ipc_pipe_path, request_msg, - event_loop + event_loop, + event_bus, ) assert result == expected @@ -511,7 +520,8 @@ async def test_eth_over_ipc( result = await get_ipc_response( jsonrpc_ipc_pipe_path, request_msg, - event_loop + event_loop, + event_bus, ) assert result == expected @@ -549,13 +559,15 @@ async def test_admin_addPeer_error_messages( jsonrpc_ipc_pipe_path, request_msg, event_loop, + event_bus, expected, ipc_server): result = await get_ipc_response( jsonrpc_ipc_pipe_path, request_msg, - event_loop + event_loop, + event_bus, ) assert result == expected @@ -577,7 +589,8 @@ async def test_admin_addPeer_fires_message( result = await get_ipc_response( jsonrpc_ipc_pipe_path, request, - event_loop + event_loop, + event_bus ) assert result == {'id': 3, 'jsonrpc': '2.0', 'result': None} diff --git a/tests/core/network-db/test_connection_tracker_server.py b/tests/core/network-db/test_connection_tracker_server.py index 3b000f543d..92100ad6be 100644 --- a/tests/core/network-db/test_connection_tracker_server.py +++ b/tests/core/network-db/test_connection_tracker_server.py @@ -1,7 +1,7 @@ import asyncio import pytest -from lahja import BroadcastConfig +from lahja import BroadcastConfig, BaseEvent from p2p.tools.factories import NodeFactory @@ -33,7 +33,7 @@ async def test_connection_tracker_server_and_client(event_loop, event_bus): bus_tracker = ConnectionTrackerClient(event_bus, config=config) # Give `bus_tracker` a moment to setup subscriptions - await asyncio.sleep(0.01) + await event_bus.wait_until_all_connections_subscribed_to(BaseEvent) # ensure we can read from the tracker over the event bus assert await bus_tracker.should_connect_to(remote_a) is False diff --git a/tests/core/p2p-proto/bcc/test_requests.py b/tests/core/p2p-proto/bcc/test_requests.py index ed720c6d14..5e2ba503cd 100644 --- a/tests/core/p2p-proto/bcc/test_requests.py +++ b/tests/core/p2p-proto/bcc/test_requests.py @@ -13,6 +13,9 @@ ) from trinity.constants import TO_NETWORKING_BROADCAST_CONFIG +from trinity.protocol.common.events import ( + PeerPoolMessageEvent, +) from trinity.protocol.bcc.commands import ( BeaconBlocks, ) @@ -56,6 +59,8 @@ async def get_request_server_setup(request, event_loop, event_bus, chain_db): event_bus, TO_NETWORKING_BROADCAST_CONFIG, bob.context.chain_db) asyncio.ensure_future(bob_request_server.run()) + await event_bus.wait_until_all_connections_subscribed_to(PeerPoolMessageEvent) + def finalizer(): event_loop.run_until_complete(bob_request_server.cancel()) event_loop.run_until_complete(event_server.cancel())