diff --git a/docs/source/api/exchange/binance/schema.rst b/docs/source/api/exchange/binance/schema.rst index 680d8ca..f5e451f 100644 --- a/docs/source/api/exchange/binance/schema.rst +++ b/docs/source/api/exchange/binance/schema.rst @@ -57,7 +57,7 @@ WebSocket Message Models :undoc-members: :show-inheritance: -.. autoclass:: BinanceAccountUpdateMsg +.. autoclass:: BinanceFuturesUpdateMsg :members: :undoc-members: :show-inheritance: diff --git a/nexustrader/exchange/binance/connector.py b/nexustrader/exchange/binance/connector.py index ed24d1c..96852a0 100644 --- a/nexustrader/exchange/binance/connector.py +++ b/nexustrader/exchange/binance/connector.py @@ -37,6 +37,8 @@ BinanceFuturesOrderUpdateMsg, BinanceSpotAccountInfo, BinanceFuturesAccountInfo, + BinanceSpotUpdateMsg, + BinanceFuturesUpdateMsg, ) from nexustrader.core.cache import AsyncCache from nexustrader.core.nautilius_core import MessageBus @@ -272,6 +274,13 @@ def __init__( self._ws_msg_futures_order_update_decoder = msgspec.json.Decoder( BinanceFuturesOrderUpdateMsg ) + self._ws_msg_spot_account_update_decoder = msgspec.json.Decoder( + BinanceSpotUpdateMsg + ) + self._ws_msg_futures_account_update_decoder = msgspec.json.Decoder( + BinanceFuturesUpdateMsg + ) + async def _init_account_balance(self): if self._account_type.is_spot or self._account_type.is_isolated_margin_or_margin: @@ -382,16 +391,64 @@ async def connect(self): def _ws_msg_handler(self, raw: bytes): try: + self._log.debug(f"Received message: {raw}") msg = self._ws_msg_general_decoder.decode(raw) if msg.e: match msg.e: - case BinanceUserDataStreamWsEventType.ORDER_TRADE_UPDATE: + case BinanceUserDataStreamWsEventType.ORDER_TRADE_UPDATE: # futures order update self._parse_order_trade_update(raw) - case BinanceUserDataStreamWsEventType.EXECUTION_REPORT: + case BinanceUserDataStreamWsEventType.EXECUTION_REPORT: # spot order update self._parse_execution_report(raw) + case BinanceUserDataStreamWsEventType.ACCOUNT_UPDATE: # futures account update + self._parse_account_update(raw) + case BinanceUserDataStreamWsEventType.OUT_BOUND_ACCOUNT_POSITION: # spot account update + self._parse_out_bound_account_position(raw) except msgspec.DecodeError: self._log.error(f"Error decoding message: {str(raw)}") - + + def _parse_out_bound_account_position(self, raw: bytes): + res = self._ws_msg_spot_account_update_decoder.decode(raw) + balances = res.parse_to_balances() + self._cache._apply_balance(account_type=self._account_type, balances=balances) + + def _parse_account_update(self, raw: bytes): + res = self._ws_msg_futures_account_update_decoder.decode(raw) + balances = res.a.parse_to_balances() + self._cache._apply_balance(account_type=self._account_type, balances=balances) + + event_unit = res.fs + for position in res.a.P: + if event_unit == BinanceBusinessUnit.UM: + id = position.s + "_linear" + symbol = self._market_id[id] + elif event_unit == BinanceBusinessUnit.CM: + id = position.s + "_inverse" + symbol = self._market_id[id] + else: + id = position.s + self.market_type + symbol = self._market_id[id] + + signed_amount = Decimal(position.pa) + side = position.ps.parse_to_position_side() + if signed_amount == 0: + side = None # 0 means no position side + else: + if side == PositionSide.FLAT: + if signed_amount > 0: + side = PositionSide.LONG + elif signed_amount < 0: + side = PositionSide.SHORT + position = Position( + symbol=symbol, + exchange=self._exchange_id, + signed_amount=signed_amount, + side=side, + entry_price=float(position.ep), + unrealized_pnl=float(position.up), + realized_pnl=float(position.cr), + ) + self._cache._apply_position(position) + def _parse_order_trade_update(self, raw: bytes) -> Order: res = self._ws_msg_futures_order_update_decoder.decode(raw) diff --git a/nexustrader/exchange/binance/rest_api.py b/nexustrader/exchange/binance/rest_api.py index 2501797..360eb46 100644 --- a/nexustrader/exchange/binance/rest_api.py +++ b/nexustrader/exchange/binance/rest_api.py @@ -56,12 +56,14 @@ async def _fetch( endpoint: str, payload: Dict[str, Any] = None, signed: bool = False, + required_timestamp: bool = True, ) -> Any: self._init_session() url = urljoin(base_url, endpoint) payload = payload or {} - payload["timestamp"] = self._clock.timestamp_ms() + if required_timestamp: + payload["timestamp"] = self._clock.timestamp_ms() payload = urlencode(payload) if signed: @@ -122,7 +124,7 @@ async def put_dapi_v1_listen_key(self): """ base_url = self._get_base_url(BinanceAccountType.COIN_M_FUTURE) end_point = "/dapi/v1/listenKey" - raw = await self._fetch("PUT", base_url, end_point) + raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False) return orjson.loads(raw) async def post_dapi_v1_listen_key(self): @@ -131,7 +133,7 @@ async def post_dapi_v1_listen_key(self): """ base_url = self._get_base_url(BinanceAccountType.COIN_M_FUTURE) end_point = "/dapi/v1/listenKey" - raw = await self._fetch("POST", base_url, end_point) + raw = await self._fetch("POST", base_url, end_point, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def post_api_v3_user_data_stream(self) -> BinanceListenKey: @@ -140,7 +142,7 @@ async def post_api_v3_user_data_stream(self) -> BinanceListenKey: """ base_url = self._get_base_url(BinanceAccountType.SPOT) end_point = "/api/v3/userDataStream" - raw = await self._fetch("POST", base_url, end_point) + raw = await self._fetch("POST", base_url, end_point, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def put_api_v3_user_data_stream(self, listen_key: str): @@ -150,7 +152,7 @@ async def put_api_v3_user_data_stream(self, listen_key: str): base_url = self._get_base_url(BinanceAccountType.SPOT) end_point = "/api/v3/userDataStream" raw = await self._fetch( - "PUT", base_url, end_point, payload={"listenKey": listen_key} + "PUT", base_url, end_point, payload={"listenKey": listen_key}, required_timestamp=False ) return orjson.loads(raw) @@ -160,7 +162,7 @@ async def post_sapi_v1_user_data_stream(self) -> BinanceListenKey: """ base_url = self._get_base_url(BinanceAccountType.MARGIN) end_point = "/sapi/v1/userDataStream" - raw = await self._fetch("POST", base_url, end_point) + raw = await self._fetch("POST", base_url, end_point, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def put_sapi_v1_user_data_stream(self, listen_key: str): @@ -170,7 +172,7 @@ async def put_sapi_v1_user_data_stream(self, listen_key: str): base_url = self._get_base_url(BinanceAccountType.MARGIN) end_point = "/sapi/v1/userDataStream" raw = await self._fetch( - "PUT", base_url, end_point, payload={"listenKey": listen_key} + "PUT", base_url, end_point, payload={"listenKey": listen_key}, required_timestamp=False ) return orjson.loads(raw) @@ -180,7 +182,7 @@ async def post_sapi_v1_user_data_stream_isolated(self, symbol: str) -> BinanceLi """ base_url = self._get_base_url(BinanceAccountType.ISOLATED_MARGIN) end_point = "/sapi/v1/userDataStream/isolated" - raw = await self._fetch("POST", base_url, end_point, payload={"symbol": symbol}) + raw = await self._fetch("POST", base_url, end_point, payload={"symbol": symbol}, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def put_sapi_v1_user_data_stream_isolated(self, symbol: str, listen_key: str): @@ -194,6 +196,7 @@ async def put_sapi_v1_user_data_stream_isolated(self, symbol: str, listen_key: s base_url, end_point, payload={"symbol": symbol, "listenKey": listen_key}, + required_timestamp=False ) return orjson.loads(raw) @@ -203,7 +206,7 @@ async def post_fapi_v1_listen_key(self) -> BinanceListenKey: """ base_url = self._get_base_url(BinanceAccountType.USD_M_FUTURE) end_point = "/fapi/v1/listenKey" - raw = await self._fetch("POST", base_url, end_point) + raw = await self._fetch("POST", base_url, end_point, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def put_fapi_v1_listen_key(self): @@ -212,7 +215,7 @@ async def put_fapi_v1_listen_key(self): """ base_url = self._get_base_url(BinanceAccountType.USD_M_FUTURE) end_point = "/fapi/v1/listenKey" - raw = await self._fetch("PUT", base_url, end_point) + raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False) return orjson.loads(raw) async def post_papi_v1_listen_key(self) -> BinanceListenKey: @@ -221,7 +224,7 @@ async def post_papi_v1_listen_key(self) -> BinanceListenKey: """ base_url = self._get_base_url(BinanceAccountType.PORTFOLIO_MARGIN) end_point = "/papi/v1/listenKey" - raw = await self._fetch("POST", base_url, end_point) + raw = await self._fetch("POST", base_url, end_point, required_timestamp=False) return self._listen_key_decoder.decode(raw) async def put_papi_v1_listen_key(self): @@ -230,7 +233,7 @@ async def put_papi_v1_listen_key(self): """ base_url = self._get_base_url(BinanceAccountType.PORTFOLIO_MARGIN) end_point = "/papi/v1/listenKey" - raw = await self._fetch("PUT", base_url, end_point) + raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False) return orjson.loads(raw) async def post_sapi_v1_margin_order( diff --git a/nexustrader/exchange/binance/schema.py b/nexustrader/exchange/binance/schema.py index cd481eb..ea837e6 100644 --- a/nexustrader/exchange/binance/schema.py +++ b/nexustrader/exchange/binance/schema.py @@ -407,13 +407,20 @@ class BinanceMarket(BaseMarket): feeSide: str -class BinanceAccountBalanceData(msgspec.Struct): +class BinanceFuturesBalanceData(msgspec.Struct): a: str - wb: str - cw: str - bc: str + wb: str # wallet balance + cw: str # cross wallet balance + bc: str # wallet change except PnL and Commission + + def parse_to_balance(self) -> Balance: + return Balance( + asset=self.a, + free=Decimal(self.wb), + locked=Decimal(0), + ) -class BinanceAccountPositionData(msgspec.Struct, kw_only=True): +class BinanceFuturesPositionData(msgspec.Struct, kw_only=True): s: str pa: str # position amount ep: str # entry price @@ -424,14 +431,40 @@ class BinanceAccountPositionData(msgspec.Struct, kw_only=True): iw: str | None = None # isolated wallet (if isolated position) ps: BinancePositionSide -class BinanceAccountUpdateData(msgspec.Struct, kw_only=True): +class BinanceFuturesUpdateData(msgspec.Struct, kw_only=True): m: BinanceAccountEventReasonType - B: list[BinanceAccountBalanceData] - P: list[BinanceAccountPositionData] + B: list[BinanceFuturesBalanceData] + P: list[BinanceFuturesPositionData] + + def parse_to_balances(self) -> List[Balance]: + return [balance.parse_to_balance() for balance in self.B] + -class BinanceAccountUpdateMsg(msgspec.Struct, kw_only=True): +class BinanceFuturesUpdateMsg(msgspec.Struct, kw_only=True): e: BinanceUserDataStreamWsEventType E: int T: int fs: BinanceBusinessUnit | None = None - a: BinanceAccountUpdateData + a: BinanceFuturesUpdateData + + +class BinanceSpotBalanceData(msgspec.Struct): + a: str # asset + f: str # free + l: str # locked + + def parse_to_balance(self) -> Balance: + return Balance( + asset=self.a, + free=Decimal(self.f), + locked=Decimal(self.l), + ) + +class BinanceSpotUpdateMsg(msgspec.Struct, kw_only=True): + e: BinanceUserDataStreamWsEventType # event type + E: int # event time + u: int # Time of last account update + B: list[BinanceSpotBalanceData] # balance array of the account + + def parse_to_balances(self) -> List[Balance]: + return [balance.parse_to_balance() for balance in self.B]