diff --git a/bfxapi/_version.py b/bfxapi/_version.py index e5b84fb..b6aac9a 100644 --- a/bfxapi/_version.py +++ b/bfxapi/_version.py @@ -1 +1 @@ -__version__ = "3.0.0b4" +__version__ = "3.0.0b5" diff --git a/bfxapi/websocket/_client/bfx_websocket_bucket.py b/bfxapi/websocket/_client/bfx_websocket_bucket.py index e30ec18..fe98f3e 100644 --- a/bfxapi/websocket/_client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/_client/bfx_websocket_bucket.py @@ -67,8 +67,9 @@ async def start(self) -> None: if isinstance(message, list): if (chan_id := cast(int, message[0])) and \ - (message[1] != Connection._HEARTBEAT): - self.__handler.handle(self.__subscriptions[chan_id], message[1:]) + (subscription := self.__subscriptions.get(chan_id)) and \ + (message[1] != Connection._HEARTBEAT): + self.__handler.handle(subscription, message[1:]) def __on_subscribed(self, message: Dict[str, Any]) -> None: chan_id = cast(int, message["chan_id"]) diff --git a/examples/websocket/public/order_book.py b/examples/websocket/public/order_book.py index c79ade7..ff2cc7d 100644 --- a/examples/websocket/public/order_book.py +++ b/examples/websocket/public/order_book.py @@ -2,7 +2,7 @@ import zlib from collections import OrderedDict -from typing import Dict, List +from typing import Any, Dict, List, cast from bfxapi import Client from bfxapi.types import TradingPairBook @@ -15,8 +15,6 @@ def __init__(self, symbols: List[str]): symbol: {"bids": OrderedDict(), "asks": OrderedDict()} for symbol in symbols } - self.cooldown: Dict[str, bool] = {symbol: False for symbol in symbols} - def update(self, symbol: str, data: TradingPairBook) -> None: price, count, amount = data.price, data.count, data.amount @@ -66,6 +64,15 @@ def verify(self, symbol: str, checksum: int) -> bool: return crc32 == checksum + def is_verifiable(self, symbol: str) -> bool: + return ( + len(self.__order_book[symbol]["bids"]) >= 25 + and len(self.__order_book[symbol]["asks"]) >= 25 + ) + + def clear(self, symbol: str) -> None: + self.__order_book[symbol] = {"bids": OrderedDict(), "asks": OrderedDict()} + SYMBOLS = ["tLTCBTC", "tETHUSD", "tETHBTC"] @@ -100,17 +107,20 @@ def on_t_book_update(subscription: Book, data: TradingPairBook): async def on_checksum(subscription: Book, value: int): symbol = subscription["symbol"] - if order_book.verify(symbol, value): - order_book.cooldown[symbol] = False - elif not order_book.cooldown[symbol]: - print( - "Mismatch between local and remote checksums: " - f"restarting book for symbol <{symbol}>..." - ) + if order_book.is_verifiable(symbol): + if not order_book.verify(symbol, value): + print( + "Mismatch between local and remote checksums: " + f"restarting book for symbol <{symbol}>..." + ) + + _subscription = cast(Dict[str, Any], subscription.copy()) + + await bfx.wss.unsubscribe(sub_id=_subscription.pop("sub_id")) - await bfx.wss.resubscribe(sub_id=subscription["sub_id"]) + await bfx.wss.subscribe(**_subscription) - order_book.cooldown[symbol] = True + order_book.clear(symbol) bfx.wss.run() diff --git a/examples/websocket/public/raw_order_book.py b/examples/websocket/public/raw_order_book.py index fd6ebcc..b6c9fb2 100644 --- a/examples/websocket/public/raw_order_book.py +++ b/examples/websocket/public/raw_order_book.py @@ -2,7 +2,7 @@ import zlib from collections import OrderedDict -from typing import Dict, List +from typing import Any, Dict, List, cast from bfxapi import Client from bfxapi.types import TradingPairRawBook @@ -15,8 +15,6 @@ def __init__(self, symbols: List[str]): symbol: {"bids": OrderedDict(), "asks": OrderedDict()} for symbol in symbols } - self.cooldown: Dict[str, bool] = {symbol: False for symbol in symbols} - def update(self, symbol: str, data: TradingPairRawBook) -> None: order_id, price, amount = data.order_id, data.price, data.amount @@ -66,6 +64,15 @@ def verify(self, symbol: str, checksum: int) -> bool: return crc32 == checksum + def is_verifiable(self, symbol: str) -> bool: + return ( + len(self.__raw_order_book[symbol]["bids"]) >= 25 + and len(self.__raw_order_book[symbol]["asks"]) >= 25 + ) + + def clear(self, symbol: str) -> None: + self.__raw_order_book[symbol] = {"bids": OrderedDict(), "asks": OrderedDict()} + SYMBOLS = ["tLTCBTC", "tETHUSD", "tETHBTC"] @@ -100,17 +107,20 @@ def on_t_raw_book_update(subscription: Book, data: TradingPairRawBook): async def on_checksum(subscription: Book, value: int): symbol = subscription["symbol"] - if raw_order_book.verify(symbol, value): - raw_order_book.cooldown[symbol] = False - elif not raw_order_book.cooldown[symbol]: - print( - "Mismatch between local and remote checksums: " - f"restarting book for symbol <{symbol}>..." - ) + if raw_order_book.is_verifiable(symbol): + if not raw_order_book.verify(symbol, value): + print( + "Mismatch between local and remote checksums: " + f"restarting book for symbol <{symbol}>..." + ) + + _subscription = cast(Dict[str, Any], subscription.copy()) + + await bfx.wss.unsubscribe(sub_id=_subscription.pop("sub_id")) - await bfx.wss.resubscribe(sub_id=subscription["sub_id"]) + await bfx.wss.subscribe(**_subscription) - raw_order_book.cooldown[symbol] = True + raw_order_book.clear(symbol) bfx.wss.run()