-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrates.py
40 lines (30 loc) · 1.09 KB
/
rates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import http.client
import cachetools.func
import requests
import json
from concurrent.futures import ThreadPoolExecutor
USD = "USDT"
URL = "https://api.binance.com"
API_TYPES = {"price": "/api/v3/avgPrice", "time": "/api/v3/time"}
@cachetools.func.ttl_cache(maxsize=20, ttl=60)
def get_rates_cached(currs=["BTC", "ETH"]):
return get_rates_concurrent(currs)
def get_rates_concurrent(currs=["BTC", "ETH"]):
executor = ThreadPoolExecutor(max_workers=4)
if isinstance(currs, str):
currency_pairs = currs + USD
else:
currency_pairs = [currency + USD for currency in currs]
result = zip(currency_pairs, executor.map(make_request, currency_pairs))
return dict(result)
def get_rates(currs=["BTC", "ETH"]):
currency_pairs = [currency + USD for currency in currs]
results = [{symbol: make_request(symbol)} for symbol in currency_pairs]
return results
def make_request(symbol):
try:
params = {"symbol": symbol}
r = requests.get(url=URL+API_TYPES["price"], params=params)
return json.loads(r.content)["price"]
except:
return None