Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to caching middleware #1742

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions brownie/network/middlewares/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,15 @@ def __init__(self, w3: Web3) -> None:
self.cur = Cursor(_get_data_folder().joinpath("cache.db"))
self.cur.execute(f"CREATE TABLE IF NOT EXISTS {self.table_key} (method, params, result)")

latest = w3.eth.get_block("latest")
self.last_block = latest.hash
self.last_block_seen = latest.timestamp
self.last_request = 0.0
self.block_cache: OrderedDict = OrderedDict()
self.block_filter = w3.eth.filter("latest")

self.lock = threading.Lock()
self.event = threading.Event()
self.is_killed = False
threading.Thread(target=self.block_filter_loop, daemon=True).start()
self.start_block_filter_loop()

def start_block_filter_loop(self):
self.event.clear()
self.loop_thread = threading.Thread(target=self.loop_exception_handler, daemon=True)
self.loop_thread.start()
self.event.wait()

@classmethod
def get_layer(cls, w3: Web3, network_type: str) -> Optional[int]:
Expand All @@ -138,7 +136,25 @@ def get_layer(cls, w3: Web3, network_type: str) -> Optional[int]:
def time_since(self) -> float:
return time.time() - self.last_request

def loop_exception_handler(self) -> None:
try:
self.block_filter_loop()
except Exception:
# catch unhandled exceptions to avoid random error messages in the console
self.block_cache.clear()
self.is_killed = True

def block_filter_loop(self) -> None:
# initialize required state variables within the loop to avoid recursion death
latest = self.w3.eth.get_block("latest")
self.last_block = latest.hash
self.last_block_seen = latest.timestamp
self.last_request = time.time()
self.block_cache: OrderedDict = OrderedDict()
self.block_filter = self.w3.eth.filter("latest")
self.is_killed = False
self.event.set()

while not self.is_killed:
# if the last RPC request was > 60 seconds ago, reduce the rate of updates.
# we eventually settle at one query per minute after 10 minutes of no requests.
Expand Down Expand Up @@ -215,6 +231,10 @@ def process_request(self, make_request: Callable, method: str, params: List) ->
data = HexBytes(data)
return {"id": "cache", "jsonrpc": "2.0", "result": data}

if not self.loop_thread.is_alive():
# restart the block filter loop if it has crashed (usually from a ConnectionError)
self.start_block_filter_loop()

with self.lock:
self.last_request = time.time()
self.event.set()
Expand Down
Loading