Skip to content

Commit

Permalink
eos statistic rest api
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Aug 8, 2018
1 parent 3c1295f commit a46d619
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 61 deletions.
23 changes: 22 additions & 1 deletion fooltrader/api/esapi/esapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fooltrader.contract.data_contract import KDATA_STOCK_COL, KDATA_FUTURE_COL, KDATA_INDEX_COL, \
KDATA_COMMON_COL
from fooltrader.contract.es_contract import get_es_kdata_index, get_cryptocurrency_user_statistic_index, \
get_cryptocurrency_daily_user_statistic_index
get_cryptocurrency_daily_user_statistic_index, get_es_statistic_index
from fooltrader.domain.business.es_subscription import PriceSubscription
from fooltrader.utils.es_utils import es_resp_to_payload
from fooltrader.utils.utils import to_time_str
Expand Down Expand Up @@ -132,6 +132,27 @@ def es_get_kdata(security_item, exchange=None, the_date=None, start_date=None, e
return es_resp_to_payload(resp, csv)


def es_get_statistic(security_item, the_date=None, start_date=None, end_date=None, level='day',
from_idx=0, size=500):
security_item = to_security_item(security_item)

index = get_es_statistic_index(security_type=security_item['type'], exchange=security_item['exchange'],
level=level)
# 单日的日k线直接按id获取
if level == 'day' and the_date:
doc_id = '{}_{}'.format(security_item['id'], to_time_str(the_date))
return es_client.get_source(index=index, doc_type='doc', id=doc_id)
elif start_date and end_date:
s = Search(using=es_client, index=index, doc_type='doc') \
.filter('term', code=security_item['code']) \
.filter('range', timestamp={'gte': start_date, 'lte': end_date}) \
.sort({"timestamp": {"order": "asc"}})

resp = s[from_idx:from_idx + size].execute()

return es_resp_to_payload(resp)


if __name__ == '__main__':
print(es_get_kdata('300027', the_date='2017-09-04'))
print(es_get_kdata('300027', the_date='2017-09-04', fields=['close']))
Expand Down
26 changes: 13 additions & 13 deletions fooltrader/botsamples/eos_statistic_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@


class EosStatisticBot(NotifyEventBot):
BIG_ORDER = 2000
MIDDLE_ORDER = 500
BIG_ORDER = 2000 * 10000
MIDDLE_ORDER = 500 * 10000

def on_init(self):
super().on_init()
Expand Down Expand Up @@ -132,19 +132,19 @@ def generate_eos_daily_statistic(self):
turnover = self.df['turnover'].sum()
flow = (self.df['turnover'] * self.df['direction']).sum()

flowIn = self.df[self.df['direction'] == 1]['volume'].sum()
flowOut = self.df[self.df['direction'] == -1]['volume'].sum()
flowIn = self.df[self.df['direction'] == 1]['turnover'].sum()
flowOut = self.df[self.df['direction'] == -1]['turnover'].sum()

bigFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] >= self.BIG_ORDER)]['volume'].sum()
middleFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] >= self.MIDDLE_ORDER) & (
self.df['volume'] < self.BIG_ORDER)]['volume'].sum()
smallFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] < self.MIDDLE_ORDER)]['volume'].sum()
bigFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] >= self.BIG_ORDER)]['turnover'].sum()
middleFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] >= self.MIDDLE_ORDER) & (
self.df['turnover'] < self.BIG_ORDER)]['turnover'].sum()
smallFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] < self.MIDDLE_ORDER)]['turnover'].sum()

bigFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] >= self.BIG_ORDER)]['volume'].sum()
middleFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] >= self.MIDDLE_ORDER) & (
self.df['volume'] < self.BIG_ORDER)]['volume'].sum()
smallFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] < self.MIDDLE_ORDER)][
'volume'].sum()
bigFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] >= self.BIG_ORDER)]['turnover'].sum()
middleFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] >= self.MIDDLE_ORDER) & (
self.df['turnover'] < self.BIG_ORDER)]['turnover'].sum()
smallFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] < self.MIDDLE_ORDER)][
'turnover'].sum()

self.update_statistic_doc(self.latest_statistic_record, {'volume': volume,
'turnover': turnover,
Expand Down
28 changes: 15 additions & 13 deletions fooltrader/botsamples/eos_user_statistic_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ def on_event(self, event_item):
self.df = pd.DataFrame(self.item_list)

self.generate_user_statistic()
# print(self.es_actions[0:10])
# if self.es_actions:
# resp = elasticsearch.helpers.bulk(es_client, self.es_actions)
# self.logger.info("index success:{} failed:{}".format(resp[0], len(resp[1])))
# if resp[1]:
# self.logger.error("error:{}".format(resp[1]))

if self.es_actions:
resp = elasticsearch.helpers.bulk(es_client, self.es_actions)
self.logger.info("index success:{} failed:{}".format(resp[0], len(resp[1])))
if resp[1]:
self.logger.error("error:{}".format(resp[1]))

self.init_new_computing_interval(event_item['timestamp'])
self.es_actions = []
Expand All @@ -112,9 +112,9 @@ def update_statistic_doc(self, statistic_doc, append_record, updateTimestamp):
statistic_doc[key] += float(the_value)
else:
statistic_doc[key] = float(the_value)
statistic_doc['updateTimestamp'] = self.last_mirco_time_str
statistic_doc.save(force=True)
# self.es_actions.append(statistic_doc.to_dict(include_meta=True))
statistic_doc['updateTimestamp'] = updateTimestamp
# statistic_doc.save(force=True)
self.es_actions.append(statistic_doc.to_dict(include_meta=True))

def update_daily_user_statistic(self, user_id, record, update_timestamp):
latest_user_daily_statistic = self.user_map_latest_user_daily_statistic.get(user_id)
Expand All @@ -125,6 +125,7 @@ def update_daily_user_statistic(self, user_id, record, update_timestamp):
latest_user_daily_statistic = EosUserStatistic(
meta={'id': the_record['id'], 'index': daily_user_statistic_index_name},
**the_record)
self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic

# ignore the user statistic has computed before
if latest_user_daily_statistic and self.kdata_timestamp <= to_timestamp(
Expand All @@ -138,11 +139,11 @@ def update_daily_user_statistic(self, user_id, record, update_timestamp):
meta={'id': doc_id, 'index': daily_user_statistic_index_name},
id=doc_id,
userId=user_id,
timestamp=to_time_str(self.last_day_time_str, time_fmt=TIME_FORMAT_MICRO),
timestamp=self.last_day_time_str,
securityId=self.security_id,
code=self.security_item['code'],
name=self.security_item['name'])
self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic
self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic

# update user daily statistic
self.update_statistic_doc(latest_user_daily_statistic, record, update_timestamp)
Expand All @@ -156,6 +157,7 @@ def update_user_statistic(self, user_id, record, update_timestamp):
if the_record:
latest_user_statistic = EosUserStatistic(meta={'id': doc_id, 'index': user_statistic_index_name},
**the_record)
self.user_map_latest_user_statistic[user_id] = latest_user_statistic
# ignore the user statistic has computed before
if latest_user_statistic and self.kdata_timestamp <= to_timestamp(
latest_user_statistic['updateTimestamp']):
Expand All @@ -165,11 +167,11 @@ def update_user_statistic(self, user_id, record, update_timestamp):
latest_user_statistic = EosUserStatistic(meta={'id': doc_id, 'index': user_statistic_index_name},
id=doc_id,
userId=user_id,
timestamp=to_time_str(self.last_day_time_str, time_fmt=TIME_FORMAT_MICRO),
timestamp=self.last_day_time_str,
securityId=self.security_id,
code=self.security_item['code'],
name=self.security_item['name'])
self.user_map_latest_user_statistic[user_id] = latest_user_statistic
self.user_map_latest_user_statistic[user_id] = latest_user_statistic

# update user statistic
self.update_statistic_doc(latest_user_statistic, record, update_timestamp)
Expand Down
4 changes: 2 additions & 2 deletions fooltrader/domain/data/es_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ class Meta:
class EosUserStatistic(BaseDocType):
id = Keyword()
userId = Keyword()
timestamp = Date()
updateTimestamp = Date()
timestamp = Date(format="yyyyMMdd HHmmss.SSS||yyyy-MM-dd||epoch_millis")
updateTimestamp = Date(format="yyyyMMdd HHmmss.SSS||yyyy-MM-dd||epoch_millis")
securityId = Keyword()
code = Keyword()
name = Keyword()
Expand Down
6 changes: 3 additions & 3 deletions fooltrader/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
app.config.from_object(Config(root_path=FOOLTRADER_STORE_PATH))
app.config['JSON_AS_ASCII'] = False

from fooltrader.rest.security import *
from fooltrader.rest.subscription import *
from fooltrader.rest.tech import *
from fooltrader.rest.controller.security import *
from fooltrader.rest.controller.subscription import *
from fooltrader.rest.controller.tech import *
1 change: 1 addition & 0 deletions fooltrader/rest/controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
File renamed without changes.
File renamed without changes.
60 changes: 60 additions & 0 deletions fooltrader/rest/controller/tech.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
from flask import request

from fooltrader.api.esapi import esapi
from fooltrader.rest import app
from fooltrader.rest.common import success


@app.route('/tech/kdata/<securityid>', methods=['GET'])
def get_kdata(securityid):
the_date = request.args.get('the_date')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
level = request.args.get('level', 'day')

fields = request.args.get('fields')
if not fields:
fields = ['timestamp', 'open', 'high', 'low', 'close', 'volume']

from_idx = request.args.get('from_idx', 0)
size = request.args.get('size', 500)

result = esapi.es_get_kdata(security_item=securityid, the_date=the_date, start_date=start_date,
end_date=end_date, fields=fields, csv=True,
level=level, from_idx=int(from_idx), size=int(size))

return success(result)


@app.route('/tech/statistic/<securityid>', methods=['GET'])
def get_statistic(securityid):
the_date = request.args.get('the_date')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
level = request.args.get('level', 'day')

from_idx = request.args.get('from_idx', 0)
size = request.args.get('size', 500)

result = esapi.es_get_statistic(security_item=securityid, the_date=the_date, start_date=start_date,
end_date=end_date, level=level, from_idx=int(from_idx), size=int(size))

return success(result)


@app.route('/tech/user_statistic/<main_chain>', defaults={'user_id': None}, methods=['GET'])
@app.route('/tech/user_statistic/<main_chain>/<user_id>', methods=['GET'])
def get_user_statistic(main_chain, user_id):
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
security_id = request.args.get('security_id', 'cryptocurrency_contract_RAM-EOS')

from_idx = request.args.get('from_idx', 0)
size = request.args.get('size', 100)

result = esapi.es_get_user_statistic(main_chain=main_chain, security_id=security_id, user_id=user_id,
start_date=start_date,
end_date=end_date, from_idx=int(from_idx), size=int(size))

return success(result)
27 changes: 0 additions & 27 deletions fooltrader/rest/tech.py

This file was deleted.

2 changes: 1 addition & 1 deletion fooltrader/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
g_http_proxy_items = []
g_socks2http_proxy_items = {}

TIME_FORMAT_MICRO = '%Y%m%dT%H%M%S.%f'
TIME_FORMAT_MICRO = "%Y%m%d %H%M%S.%f"

TIME_FORMAT_SEC = '%Y-%m-%d %H:%M:%S'

Expand Down
3 changes: 2 additions & 1 deletion fooltrader/utils/es_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def es_resp_to_payload(resp, csv=False):
datas = [hit['_source'].to_dict() for hit in resp['hits']['hits']]

if csv:
datas = [[data['open'], data['high'], data['low'], data['close'], data['volume']] for data in datas]
datas = [[data['timestamp'], data['open'], data['high'], data['low'], data['close'], data['volume']] for data in
datas]

return {
'total': resp['hits']['total'],
Expand Down

0 comments on commit a46d619

Please sign in to comment.