Skip to content

Commit

Permalink
Add WebSocket support for Binance spot and futures account updates
Browse files Browse the repository at this point in the history
- Implemented WebSocket message handlers for spot and futures account updates
- Added new schema classes for parsing spot and futures account update messages
- Updated BinancePrivateConnector to decode and process account balance and position updates
- Modified REST API client to support optional timestamp in requests
- Updated documentation to reflect changes in account update message classes
  • Loading branch information
River-Shi committed Feb 10, 2025
1 parent 5d54080 commit 9f4a95a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/source/api/exchange/binance/schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ WebSocket Message Models
:undoc-members:
:show-inheritance:

.. autoclass:: BinanceAccountUpdateMsg
.. autoclass:: BinanceFuturesUpdateMsg
:members:
:undoc-members:
:show-inheritance:
Expand Down
63 changes: 60 additions & 3 deletions nexustrader/exchange/binance/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
BinanceFuturesOrderUpdateMsg,
BinanceSpotAccountInfo,
BinanceFuturesAccountInfo,
BinanceSpotUpdateMsg,
BinanceFuturesUpdateMsg,
)
from nexustrader.core.cache import AsyncCache
from nexustrader.core.nautilius_core import MessageBus
Expand Down Expand Up @@ -272,6 +274,13 @@ def __init__(
self._ws_msg_futures_order_update_decoder = msgspec.json.Decoder(
BinanceFuturesOrderUpdateMsg
)
self._ws_msg_spot_account_update_decoder = msgspec.json.Decoder(
BinanceSpotUpdateMsg
)
self._ws_msg_futures_account_update_decoder = msgspec.json.Decoder(
BinanceFuturesUpdateMsg
)


async def _init_account_balance(self):
if self._account_type.is_spot or self._account_type.is_isolated_margin_or_margin:
Expand Down Expand Up @@ -382,16 +391,64 @@ async def connect(self):

def _ws_msg_handler(self, raw: bytes):
try:
self._log.debug(f"Received message: {raw}")
msg = self._ws_msg_general_decoder.decode(raw)
if msg.e:
match msg.e:
case BinanceUserDataStreamWsEventType.ORDER_TRADE_UPDATE:
case BinanceUserDataStreamWsEventType.ORDER_TRADE_UPDATE: # futures order update
self._parse_order_trade_update(raw)
case BinanceUserDataStreamWsEventType.EXECUTION_REPORT:
case BinanceUserDataStreamWsEventType.EXECUTION_REPORT: # spot order update
self._parse_execution_report(raw)
case BinanceUserDataStreamWsEventType.ACCOUNT_UPDATE: # futures account update
self._parse_account_update(raw)
case BinanceUserDataStreamWsEventType.OUT_BOUND_ACCOUNT_POSITION: # spot account update
self._parse_out_bound_account_position(raw)
except msgspec.DecodeError:
self._log.error(f"Error decoding message: {str(raw)}")


def _parse_out_bound_account_position(self, raw: bytes):
res = self._ws_msg_spot_account_update_decoder.decode(raw)
balances = res.parse_to_balances()
self._cache._apply_balance(account_type=self._account_type, balances=balances)

def _parse_account_update(self, raw: bytes):
res = self._ws_msg_futures_account_update_decoder.decode(raw)
balances = res.a.parse_to_balances()
self._cache._apply_balance(account_type=self._account_type, balances=balances)

event_unit = res.fs
for position in res.a.P:
if event_unit == BinanceBusinessUnit.UM:
id = position.s + "_linear"
symbol = self._market_id[id]
elif event_unit == BinanceBusinessUnit.CM:
id = position.s + "_inverse"
symbol = self._market_id[id]
else:
id = position.s + self.market_type
symbol = self._market_id[id]

signed_amount = Decimal(position.pa)
side = position.ps.parse_to_position_side()
if signed_amount == 0:
side = None # 0 means no position side
else:
if side == PositionSide.FLAT:
if signed_amount > 0:
side = PositionSide.LONG
elif signed_amount < 0:
side = PositionSide.SHORT
position = Position(
symbol=symbol,
exchange=self._exchange_id,
signed_amount=signed_amount,
side=side,
entry_price=float(position.ep),
unrealized_pnl=float(position.up),
realized_pnl=float(position.cr),
)
self._cache._apply_position(position)

def _parse_order_trade_update(self, raw: bytes) -> Order:
res = self._ws_msg_futures_order_update_decoder.decode(raw)

Expand Down
27 changes: 15 additions & 12 deletions nexustrader/exchange/binance/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ async def _fetch(
endpoint: str,
payload: Dict[str, Any] = None,
signed: bool = False,
required_timestamp: bool = True,
) -> Any:
self._init_session()

url = urljoin(base_url, endpoint)
payload = payload or {}
payload["timestamp"] = self._clock.timestamp_ms()
if required_timestamp:
payload["timestamp"] = self._clock.timestamp_ms()
payload = urlencode(payload)

if signed:
Expand Down Expand Up @@ -122,7 +124,7 @@ async def put_dapi_v1_listen_key(self):
"""
base_url = self._get_base_url(BinanceAccountType.COIN_M_FUTURE)
end_point = "/dapi/v1/listenKey"
raw = await self._fetch("PUT", base_url, end_point)
raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False)
return orjson.loads(raw)

async def post_dapi_v1_listen_key(self):
Expand All @@ -131,7 +133,7 @@ async def post_dapi_v1_listen_key(self):
"""
base_url = self._get_base_url(BinanceAccountType.COIN_M_FUTURE)
end_point = "/dapi/v1/listenKey"
raw = await self._fetch("POST", base_url, end_point)
raw = await self._fetch("POST", base_url, end_point, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def post_api_v3_user_data_stream(self) -> BinanceListenKey:
Expand All @@ -140,7 +142,7 @@ async def post_api_v3_user_data_stream(self) -> BinanceListenKey:
"""
base_url = self._get_base_url(BinanceAccountType.SPOT)
end_point = "/api/v3/userDataStream"
raw = await self._fetch("POST", base_url, end_point)
raw = await self._fetch("POST", base_url, end_point, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def put_api_v3_user_data_stream(self, listen_key: str):
Expand All @@ -150,7 +152,7 @@ async def put_api_v3_user_data_stream(self, listen_key: str):
base_url = self._get_base_url(BinanceAccountType.SPOT)
end_point = "/api/v3/userDataStream"
raw = await self._fetch(
"PUT", base_url, end_point, payload={"listenKey": listen_key}
"PUT", base_url, end_point, payload={"listenKey": listen_key}, required_timestamp=False
)
return orjson.loads(raw)

Expand All @@ -160,7 +162,7 @@ async def post_sapi_v1_user_data_stream(self) -> BinanceListenKey:
"""
base_url = self._get_base_url(BinanceAccountType.MARGIN)
end_point = "/sapi/v1/userDataStream"
raw = await self._fetch("POST", base_url, end_point)
raw = await self._fetch("POST", base_url, end_point, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def put_sapi_v1_user_data_stream(self, listen_key: str):
Expand All @@ -170,7 +172,7 @@ async def put_sapi_v1_user_data_stream(self, listen_key: str):
base_url = self._get_base_url(BinanceAccountType.MARGIN)
end_point = "/sapi/v1/userDataStream"
raw = await self._fetch(
"PUT", base_url, end_point, payload={"listenKey": listen_key}
"PUT", base_url, end_point, payload={"listenKey": listen_key}, required_timestamp=False
)
return orjson.loads(raw)

Expand All @@ -180,7 +182,7 @@ async def post_sapi_v1_user_data_stream_isolated(self, symbol: str) -> BinanceLi
"""
base_url = self._get_base_url(BinanceAccountType.ISOLATED_MARGIN)
end_point = "/sapi/v1/userDataStream/isolated"
raw = await self._fetch("POST", base_url, end_point, payload={"symbol": symbol})
raw = await self._fetch("POST", base_url, end_point, payload={"symbol": symbol}, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def put_sapi_v1_user_data_stream_isolated(self, symbol: str, listen_key: str):
Expand All @@ -194,6 +196,7 @@ async def put_sapi_v1_user_data_stream_isolated(self, symbol: str, listen_key: s
base_url,
end_point,
payload={"symbol": symbol, "listenKey": listen_key},
required_timestamp=False
)
return orjson.loads(raw)

Expand All @@ -203,7 +206,7 @@ async def post_fapi_v1_listen_key(self) -> BinanceListenKey:
"""
base_url = self._get_base_url(BinanceAccountType.USD_M_FUTURE)
end_point = "/fapi/v1/listenKey"
raw = await self._fetch("POST", base_url, end_point)
raw = await self._fetch("POST", base_url, end_point, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def put_fapi_v1_listen_key(self):
Expand All @@ -212,7 +215,7 @@ async def put_fapi_v1_listen_key(self):
"""
base_url = self._get_base_url(BinanceAccountType.USD_M_FUTURE)
end_point = "/fapi/v1/listenKey"
raw = await self._fetch("PUT", base_url, end_point)
raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False)
return orjson.loads(raw)

async def post_papi_v1_listen_key(self) -> BinanceListenKey:
Expand All @@ -221,7 +224,7 @@ async def post_papi_v1_listen_key(self) -> BinanceListenKey:
"""
base_url = self._get_base_url(BinanceAccountType.PORTFOLIO_MARGIN)
end_point = "/papi/v1/listenKey"
raw = await self._fetch("POST", base_url, end_point)
raw = await self._fetch("POST", base_url, end_point, required_timestamp=False)
return self._listen_key_decoder.decode(raw)

async def put_papi_v1_listen_key(self):
Expand All @@ -230,7 +233,7 @@ async def put_papi_v1_listen_key(self):
"""
base_url = self._get_base_url(BinanceAccountType.PORTFOLIO_MARGIN)
end_point = "/papi/v1/listenKey"
raw = await self._fetch("PUT", base_url, end_point)
raw = await self._fetch("PUT", base_url, end_point, required_timestamp=False)
return orjson.loads(raw)

async def post_sapi_v1_margin_order(
Expand Down
53 changes: 43 additions & 10 deletions nexustrader/exchange/binance/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,20 @@ class BinanceMarket(BaseMarket):
feeSide: str


class BinanceAccountBalanceData(msgspec.Struct):
class BinanceFuturesBalanceData(msgspec.Struct):
a: str
wb: str
cw: str
bc: str
wb: str # wallet balance
cw: str # cross wallet balance
bc: str # wallet change except PnL and Commission

def parse_to_balance(self) -> Balance:
return Balance(
asset=self.a,
free=Decimal(self.wb),
locked=Decimal(0),
)

class BinanceAccountPositionData(msgspec.Struct, kw_only=True):
class BinanceFuturesPositionData(msgspec.Struct, kw_only=True):
s: str
pa: str # position amount
ep: str # entry price
Expand All @@ -424,14 +431,40 @@ class BinanceAccountPositionData(msgspec.Struct, kw_only=True):
iw: str | None = None # isolated wallet (if isolated position)
ps: BinancePositionSide

class BinanceAccountUpdateData(msgspec.Struct, kw_only=True):
class BinanceFuturesUpdateData(msgspec.Struct, kw_only=True):
m: BinanceAccountEventReasonType
B: list[BinanceAccountBalanceData]
P: list[BinanceAccountPositionData]
B: list[BinanceFuturesBalanceData]
P: list[BinanceFuturesPositionData]

def parse_to_balances(self) -> List[Balance]:
return [balance.parse_to_balance() for balance in self.B]


class BinanceAccountUpdateMsg(msgspec.Struct, kw_only=True):
class BinanceFuturesUpdateMsg(msgspec.Struct, kw_only=True):
e: BinanceUserDataStreamWsEventType
E: int
T: int
fs: BinanceBusinessUnit | None = None
a: BinanceAccountUpdateData
a: BinanceFuturesUpdateData


class BinanceSpotBalanceData(msgspec.Struct):
a: str # asset
f: str # free
l: str # locked

def parse_to_balance(self) -> Balance:
return Balance(
asset=self.a,
free=Decimal(self.f),
locked=Decimal(self.l),
)

class BinanceSpotUpdateMsg(msgspec.Struct, kw_only=True):
e: BinanceUserDataStreamWsEventType # event type
E: int # event time
u: int # Time of last account update
B: list[BinanceSpotBalanceData] # balance array of the account

def parse_to_balances(self) -> List[Balance]:
return [balance.parse_to_balance() for balance in self.B]

0 comments on commit 9f4a95a

Please sign in to comment.