Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Windows support for voltaire_bundler #22

Open
wants to merge 1 commit into
base: experimental
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
keystore
*__pycache__
.vscode
.idea
*.ipc
**/.env
**/.aws
Expand Down
260 changes: 157 additions & 103 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ classifiers = [

[tool.poetry.dependencies]
python = "^3.12"
uvloop = "^0.19.0"
uvloop = { version = "^0.19.0", markers = "sys_platform != 'win32'" }
eth-abi = "^5.1.0"
eth-account = "^0.13.0"
prometheus-client = "^0.20.0"
aiohttp-cors = "^0.7.0"
aiohttp = "^3.9.5"
aiohttp = "^3.10.11"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
Expand Down
4 changes: 2 additions & 2 deletions voltaire_bundler/cli_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ def initialize_argument_parser() -> ArgumentParser:
def init_logging(args: Namespace):
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.WARNING,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%b %d %H:%M:%S.%03d",
format="%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
datefmt="%b %d %H:%M:%S",
)

logging.getLogger("Voltaire")
Expand Down
102 changes: 91 additions & 11 deletions voltaire_bundler/event_bus_manager/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import inspect
import logging
import pickle
import sys
from dataclasses import field
from functools import partial
from typing import Any, Awaitable, Callable, Dict, Optional
Expand All @@ -25,7 +26,45 @@
ResponseEvent = Dict[str, Any]
ResponseFunction = Callable[[Any], Awaitable[ResponseEvent]]
PartialResponseFunction = partial[Awaitable[ResponseEvent]]
DEFAULT_LIMIT = 2 ** 16

async def _start_pipe_server(client_connected_cb, *, path,
loop=None, limit=DEFAULT_LIMIT):
"""Start listening for connection using Win32 named pipes."""

loop = loop or asyncio.get_event_loop()

def factory():
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(
reader, client_connected_cb, loop=loop
)
return protocol

# NOTE: has no "wait_closed()" coroutine method.
server, *_ = await loop.start_serving_pipe(factory, address=path)
return server


async def _open_pipe_connection(path: Any, *, loop=None,
limit=DEFAULT_LIMIT, **kwds):
"""Connect to a server using a Win32 named pipe."""

loop = loop or asyncio.get_event_loop()

reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_pipe_connection(
lambda: protocol, path, **kwds
)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
return reader, writer

# Alias UNIX socket / Win32 named pipe functions to platform-agnostic names.
if sys.platform == 'win32':
open_ipc_connection = _open_pipe_connection
else:
open_ipc_connection = asyncio.open_unix_connection

class Endpoint:
"""This is a class representation of an Endpoint that can receive request
Expand All @@ -51,17 +90,52 @@ def __init__(self, id: str) -> None:
self.event_names = []
self.response_functions_list = []

async def _serve_until(self, cancel, filepath: str, ready=None):
"""IPC server."""
server = await asyncio.wait_for(
_start_pipe_server(self._handle_request_cb, path=filepath),
timeout=5.0
)
try:
ready.set_result(None)
await cancel
finally:
server.close()
if hasattr(server, 'wait_closed'):
await server.wait_closed()
else:
server.close()

async def _start_server(self, filepath: str):
server = await asyncio.start_unix_server(
self._handle_request_cb, filepath)
async with server:
await server.serve_forever()

async def _start_server_win32(self, filepath: str):
loop = asyncio.get_event_loop()
cancel = asyncio.Future()
path = rf'\\.\pipe\{filepath.replace('.ipc', '')}'
ready = asyncio.Future()
server = loop.create_task(self._serve_until(
cancel=cancel, filepath=path, ready=ready
))
try:
await ready
await cancel
finally:
await server

async def start_server(self, filepath: str) -> None:
"""
Starts the Enpoint server to listen to requests on an IPC socket
Starts the Endpoint server to listen to requests on an IPC socket
It creates the .ipc file if it doesn't exist
"""
logging.info("Starting " + self.id)
# filepath = self.id + ".ipc"
server = await asyncio.start_unix_server(
self._handle_request_cb, filepath)
async with server:
await server.serve_forever()
if sys.platform == 'win32':
await self._start_server_win32(filepath)
else:
await self._start_server(filepath)

def add_event_and_response_function(
self,
Expand Down Expand Up @@ -157,8 +231,11 @@ async def request(self, request_event: RequestEvent) -> ResponseEvent:
This function establish a Unix socket connection to an Endpoint
and sends a RequestEvents and waits for a ResponseEvent.
"""
filepath = self.server_id + ".ipc"
reader, writer = await asyncio.open_unix_connection(filepath)
if sys.platform == 'win32':
path = rf'\\.\pipe\{self.server_id}'
else:
path = self.server_id + ".ipc"
reader, writer = await open_ipc_connection(path)

await _broadcast(request_event, writer)
response_event: ResponseEvent = await _listen(reader)
Expand All @@ -170,10 +247,13 @@ async def broadcast_only(self, request_event: RequestEvent) -> None:
This function establish a Unix socket connection to an Endpoint and
sends a RequestEvents and waits for a ResponseEvent.
"""
# filepath = self.server_id + ".ipc"
filepath = "p2p_endpoint.ipc"
if sys.platform == 'win32':
path = r'\\.\pipe\p2p_endpoint'
else:
path = "p2p_endpoint.ipc"

try:
_, writer = await asyncio.open_unix_connection(filepath)
_, writer = await open_ipc_connection(path)
except ConnectionRefusedError:
return

Expand Down
152 changes: 80 additions & 72 deletions voltaire_bundler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from functools import partial
from signal import SIGINT, SIGTERM

import uvloop

from voltaire_bundler.execution_endpoint import ExecutionEndpoint
from voltaire_bundler.metrics.metrics import run_metrics_server
from voltaire_bundler.p2p_boot import p2p_boot
Expand All @@ -17,6 +15,14 @@
from .cli_manager import get_init_data, initialize_argument_parser
from .rpc.rpc_http_server import run_rpc_http_server

def is_uvloop_supported() -> bool:
return sys.platform in {'darwin', 'linux'} or sys.platform.startswith('freebsd')

if is_uvloop_supported():
# Set `uvloop` as the default event loop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


async def main(cmd_args=sys.argv[1:], loop=None) -> None:
argument_parser: ArgumentParser = initialize_argument_parser()
Expand Down Expand Up @@ -47,78 +53,80 @@ async def main(cmd_args=sys.argv[1:], loop=None) -> None:
else:
p2p_process = None

for signal_enum in [SIGINT, SIGTERM]:
exit_func = partial(
immediate_exit, signal_enum=signal_enum, loop=loop, p2p=p2p_process
)
loop.add_signal_handler(signal_enum, exit_func)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
if not sys.platform.startswith("win"):
for signal_enum in [SIGINT, SIGTERM]:
exit_func = partial(
immediate_exit, signal_enum=signal_enum, loop=loop, p2p=p2p_process
)
loop.add_signal_handler(signal_enum, exit_func)

async with asyncio.TaskGroup() as task_group:
execution_endpoint: ExecutionEndpoint = ExecutionEndpoint(
init_data.ethereum_node_url,
init_data.bundler_pk,
init_data.bundler_address,
init_data.chain_id,
init_data.tracer,
init_data.is_debug,
init_data.is_legacy_mode,
init_data.conditional_rpc,
init_data.flashbots_protect_node_url,
init_data.bundle_interval,
init_data.max_fee_per_gas_percentage_multiplier,
init_data.max_priority_fee_per_gas_percentage_multiplier,
init_data.enforce_gas_price_tolerance,
init_data.ethereum_node_debug_trace_call_url,
init_data.ethereum_node_eth_get_logs_url,
init_data.disable_p2p,
init_data.max_verification_gas,
init_data.max_call_data_gas,
init_data.disable_v6,
init_data.logs_incremental_range,
init_data.logs_number_of_ranges,
init_data.reputation_whitelist,
init_data.reputation_blacklist,
init_data.native_tracer_node_url,
init_data.min_stake,
init_data.min_unstake_delay,
init_data.max_bundle_gas_limit
)
task_group.create_task(execution_endpoint.start_execution_endpoint())
try:
async with asyncio.TaskGroup() as task_group:
execution_endpoint: ExecutionEndpoint = ExecutionEndpoint(
init_data.ethereum_node_url,
init_data.bundler_pk,
init_data.bundler_address,
init_data.chain_id,
init_data.tracer,
init_data.is_debug,
init_data.is_legacy_mode,
init_data.conditional_rpc,
init_data.flashbots_protect_node_url,
init_data.bundle_interval,
init_data.max_fee_per_gas_percentage_multiplier,
init_data.max_priority_fee_per_gas_percentage_multiplier,
init_data.enforce_gas_price_tolerance,
init_data.ethereum_node_debug_trace_call_url,
init_data.ethereum_node_eth_get_logs_url,
init_data.disable_p2p,
init_data.max_verification_gas,
init_data.max_call_data_gas,
init_data.disable_v6,
init_data.logs_incremental_range,
init_data.logs_number_of_ranges,
init_data.reputation_whitelist,
init_data.reputation_blacklist,
init_data.native_tracer_node_url,
init_data.min_stake,
init_data.min_unstake_delay,
init_data.max_bundle_gas_limit
)
task_group.create_task(execution_endpoint.start_execution_endpoint())

node_urls_to_check = [init_data.ethereum_node_url]
if init_data.ethereum_node_url != init_data.ethereum_node_debug_trace_call_url:
node_urls_to_check.append(init_data.ethereum_node_debug_trace_call_url)
if init_data.ethereum_node_url != init_data.ethereum_node_eth_get_logs_url:
node_urls_to_check.append(init_data.ethereum_node_eth_get_logs_url)
node_urls_to_check = [init_data.ethereum_node_url]
if init_data.ethereum_node_url != init_data.ethereum_node_debug_trace_call_url:
node_urls_to_check.append(init_data.ethereum_node_debug_trace_call_url)
if init_data.ethereum_node_url != init_data.ethereum_node_eth_get_logs_url:
node_urls_to_check.append(init_data.ethereum_node_eth_get_logs_url)

task_group.create_task(
run_rpc_http_server(
node_urls_to_check=node_urls_to_check,
target_chain_id_hex=hex(init_data.chain_id),
bundler=init_data.bundler_address,
min_balance=init_data.min_bundler_balance,
host=init_data.rpc_url,
rpc_cors_domain=init_data.rpc_cors_domain,
port=init_data.rpc_port,
is_debug=init_data.is_debug,
)
)
if init_data.is_metrics:
run_metrics_server(
host=init_data.rpc_url,
task_group.create_task(
run_rpc_http_server(
node_urls_to_check=node_urls_to_check,
target_chain_id_hex=hex(init_data.chain_id),
bundler=init_data.bundler_address,
min_balance=init_data.min_bundler_balance,
host=init_data.rpc_url,
rpc_cors_domain=init_data.rpc_cors_domain,
port=init_data.rpc_port,
is_debug=init_data.is_debug,
)
)
if init_data.health_check_interval > 0:
try:
await asyncio.ensure_future(
periodic_health_check_cron_job(
node_urls_to_check=node_urls_to_check,
target_chain_id_hex=hex(init_data.chain_id),
bundler=init_data.bundler_address,
min_balance=init_data.min_bundler_balance,
interval=init_data.health_check_interval
)
if init_data.is_metrics:
run_metrics_server(
host=init_data.rpc_url,
)
except ValueError as excp:
logging.exception(str(excp))
if init_data.health_check_interval > 0:
try:
await asyncio.ensure_future(
periodic_health_check_cron_job(
node_urls_to_check=node_urls_to_check,
target_chain_id_hex=hex(init_data.chain_id),
bundler=init_data.bundler_address,
min_balance=init_data.min_bundler_balance,
interval=init_data.health_check_interval
)
)
except ValueError as excp:
logging.exception(str(excp))
except asyncio.exceptions.CancelledError:
pass