-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbookticker-async-single.py
119 lines (97 loc) · 3.6 KB
/
bookticker-async-single.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
Binance - bookTicker (bid price, bid volume, ask price, ask volume) - real-time data
- Change 'counter' and 'interval' based on storage requirements
"""
import asyncio
import json
import csv
import os
from datetime import datetime, timezone
import websockets
# Global variables for WebSocket connection and CSV writer
csv_writer = None
csv_file = None
save_counter = 0
async def on_message(message):
global csv_writer
message = json.loads(message)
if 's' in message and message['s'] == symbol:
bid_price = float(message['b'])
bid_volume = float(message['B'])
ask_price = float(message['a'])
ask_volume = float(message['A'])
timestamp = datetime.now(timezone.utc).isoformat(sep=' ', timespec='microseconds')
csv_writer.writerow([timestamp, bid_price, bid_volume, ask_price, ask_volume])
# Function to handle WebSocket errors
def on_error(error):
print(error)
# Function to open the WebSocket connection and subscribe to the order book channel
async def on_open_factory(symbol):
print("WebSocket connection opened")
await ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": [
f"{symbol.lower()}@bookTicker",
],
"id": 1
}))
# Function to periodically save the CSV file
async def save_csv_periodically(csv_file_path, symbol, interval):
global csv_writer, save_counter
while True:
await asyncio.sleep(interval)
# Increment the save counter
save_counter += 1
# Check if it's time to create a new CSV file - save new file every 10 mins
if save_counter % 60 == 0:
csv_file_path = create_csv_file(symbol)
# Function to create a new CSV file
def create_csv_file(symbol):
# Create a new folder for each new calendar day
current_date = datetime.now(timezone.utc).strftime("%Y%m%d")
folder_path = f"data/{symbol}/{current_date}"
os.makedirs(folder_path, exist_ok=True)
# Create a new CSV file with the current timestamp
current_time = datetime.now(timezone.utc)
csv_filename = f'{folder_path}/bid_ask_data_{symbol}_{current_time.strftime("%Y%m%d_%H%M%S")}.csv'
global csv_writer, csv_file
if csv_file is not None:
csv_file.close()
csv_file = open_csv_file(csv_filename)
csv_writer = csv.writer(csv_file)
csv_writer.writerow(['Timestamp', 'Bid Price', 'Bid Volume', 'Ask Price', 'Ask Volume'])
return csv_filename
def open_csv_file(csv_filename):
return open(csv_filename, mode='w', newline='', encoding='utf-8')
async def websocket_thread():
global ws
async with websockets.connect("wss://stream.binance.com:9443/ws") as ws:
await on_open_factory(symbol)
async for message in ws:
await on_message(message)
# Function to handle WebSocket connection closure
def on_close():
global csv_writer, csv_file
print("WebSocket connection closed")
csv_writer = None
if csv_file is not None:
csv_file.close()
if __name__ == "__main__":
symbol = 'BTCUSDT'
# Create the initial CSV file
csv_file_path = create_csv_file(symbol)
# Create an event loop
loop = asyncio.get_event_loop()
# Start the WebSocket thread in the event loop
websocket_task = loop.create_task(websocket_thread())
# Schedule the save_csv_periodically function to be called periodically
save_interval = 10
loop.create_task(save_csv_periodically(csv_file_path, symbol, save_interval))
try:
# Run the event loop
loop.run_until_complete(websocket_task)
except KeyboardInterrupt:
pass
finally:
# Close the event loop
loop.close()