-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
87 lines (67 loc) · 2.43 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import socket
import time
import websockets
from web3._utils.threads import (
Timeout,
)
class PollDelayCounter:
def __init__(self, initial_delay=0, max_delay=1, initial_step=0.01):
self.initial_delay = initial_delay
self.initial_step = initial_step
self.max_delay = max_delay
self.current_delay = initial_delay
def __call__(self):
delay = self.current_delay
if self.current_delay == 0:
self.current_delay += self.initial_step
else:
self.current_delay *= 2
self.current_delay = min(self.current_delay, self.max_delay)
return delay
def reset(self):
self.current_delay = self.initial_delay
def get_open_port():
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
return str(port)
async def wait_for_ws(endpoint_uri, timeout=10):
start = time.time()
while time.time() < start + timeout:
try:
async with websockets.connect(uri=endpoint_uri):
pass
except OSError:
await asyncio.sleep(0.01)
else:
break
async def _async_wait_for_block_fixture_logic(async_w3, block_number=1, timeout=None):
if not timeout:
current_block_number = await async_w3.eth.block_number # type:ignore
timeout = (block_number - current_block_number) * 3
poll_delay_counter = PollDelayCounter()
with Timeout(timeout) as timeout:
eth_block_number = await async_w3.eth.block_number
while eth_block_number < block_number:
await async_w3.manager.coro_request("evm_mine", [])
await timeout.async_sleep(poll_delay_counter())
eth_block_number = await async_w3.eth.block_number
async def _async_wait_for_transaction_fixture_logic(async_w3, txn_hash, timeout=120):
poll_delay_counter = PollDelayCounter()
with Timeout(timeout) as timeout:
while True:
txn_receipt = await async_w3.eth.get_transaction_receipt(txn_hash)
if txn_receipt is not None:
break
asyncio.sleep(poll_delay_counter())
timeout.check()
return txn_receipt
def async_partial(f, *args, **kwargs):
async def f2(*args2, **kwargs2):
result = f(*args, *args2, **kwargs, **kwargs2)
if asyncio.iscoroutinefunction(f):
result = await result
return result
return f2