Skip to content

Commit

Permalink
Fix consensus client (#6)
Browse files Browse the repository at this point in the history
* Fix consensus client

* Fix event scanner to block check

* Increase max scan chunk size

* Fix log message

* Add remove, cleanup to ipfs upload client

* Revert execution client change

* Fix max chunk size
  • Loading branch information
tsudmi authored Dec 12, 2022
1 parent 7d5031d commit 08c0046
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sw-utils"
version = "0.2.0"
version = "0.2.1"
description = "StakeWise Python utils"
authors = ["StakeWise Labs <info@stakewise.io>"]
license = "GPL-3.0-or-later"
Expand Down
2 changes: 1 addition & 1 deletion sw_utils/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ async def get_validators_by_ids(
return await self._async_make_get_request(endpoint)


async def get_consensus_client(endpoint: str) -> ExtendedAsyncBeacon:
def get_consensus_client(endpoint: str) -> ExtendedAsyncBeacon:
return ExtendedAsyncBeacon(base_url=endpoint)
15 changes: 13 additions & 2 deletions sw_utils/event_scanner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from abc import ABC, abstractmethod
from asyncio import sleep
from typing import Any, Dict, List, Optional, Tuple
Expand All @@ -6,6 +7,8 @@
from web3.contract import AsyncContract
from web3.types import EventData

logger = logging.getLogger(__name__)


class EventScannerState(ABC):
"""
Expand Down Expand Up @@ -33,7 +36,7 @@ def process_events(self, events: List[EventData]) -> None:

class EventScanner:
min_scan_chunk_size = 10
max_scan_chunk_size = 100000
max_scan_chunk_size = 1_000_000
chunk_size_multiplier = 2
max_request_retries = 30
request_retry_seconds = 3
Expand All @@ -47,13 +50,14 @@ def __init__(
):
self.state = state
self.argument_filters = argument_filters
self.contract_event = contract_event
self._contract_call = lambda from_block, to_block: getattr(
contract.events, contract_event
).getLogs(argument_filters=argument_filters, fromBlock=from_block, toBlock=to_block)

async def process_new_events(self, to_block: BlockNumber) -> None:
current_from_block = self.state.get_from_block()
if current_from_block < to_block:
if current_from_block >= to_block:
raise ValueError('Invalid to block')

# Scan in chunks, commit between
Expand All @@ -66,6 +70,13 @@ async def process_new_events(self, to_block: BlockNumber) -> None:
)
self.state.process_events(new_events)

logger.info(
'Scanned %s events: %d/%d blocks',
self.contract_event,
current_to_block,
to_block
)

# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self._estimate_next_chunk_size(chunk_size)

Expand Down
34 changes: 21 additions & 13 deletions sw_utils/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
import ipfshttpclient
from aiohttp import ClientSession, ClientTimeout

from sw_utils.common import LimitedSizeDict

timeout = ClientTimeout(total=60)

logger = logging.getLogger(__name__)

CACHE_SIZE = 1024
IPFS_CACHE: Dict[str, bytes] = LimitedSizeDict(size_limit=CACHE_SIZE)


def _strip_ipfs_prefix(ipfs_hash: str) -> str:
return ipfs_hash.replace('ipfs://', '').replace('/ipfs/', '')
Expand Down Expand Up @@ -44,6 +39,22 @@ async def upload(self, data: str) -> str:

return _strip_ipfs_prefix(ipfs_id)

async def remove(self, ipfs_hash: str) -> None:
with ipfshttpclient.connect(
self.endpoint,
username=self.username,
password=self.password,
) as client:
client.pin.rm(_strip_ipfs_prefix(ipfs_hash))

async def cleanup(self) -> None:
with ipfshttpclient.connect(
self.endpoint,
username=self.username,
password=self.password,
) as client:
client.repo.gc(quiet=True)


class PinataUploadClient(BaseUploadClient):
endpoint = 'https://api.pinata.cloud/pinning/pinJSONToIPFS'
Expand Down Expand Up @@ -104,26 +115,23 @@ def __init__(self, endpoints: list[str]):
self.endpoints = endpoints

@staticmethod
async def _fetch_ipfs(endpoint: str, ipfs_hash: str) -> bytes:
async def _fetch_ipfs(endpoint: str, ipfs_hash: str) -> str:
with ipfshttpclient.connect(
endpoint,
) as client:
return client.cat(ipfs_hash)
return client.cat(ipfs_hash).decode('utf-8')

@staticmethod
async def _fetch_http(endpoint: str, ipfs_hash: str) -> bytes:
async def _fetch_http(endpoint: str, ipfs_hash: str) -> str:
async with ClientSession(timeout=timeout) as session:
response = await session.get(f"{endpoint.rstrip('/')}/ipfs/{ipfs_hash}")
response.raise_for_status()

return await response.read()
return (await response.read()).decode('utf-8')

async def fetch(self, ipfs_hash: str) -> bytes:
async def fetch(self, ipfs_hash: str) -> str:
"""Tries to fetch IPFS hash from different sources."""
ipfs_hash = _strip_ipfs_prefix(ipfs_hash)
if IPFS_CACHE.get(ipfs_hash):
return IPFS_CACHE[ipfs_hash]

for endpoint in self.endpoints:
try:
if endpoint.startswith('http'):
Expand Down

0 comments on commit 08c0046

Please sign in to comment.