From a46d6196cac6c3d73b43831f1892b1e23c84927b Mon Sep 17 00:00:00 2001 From: foolcage <5533061@qq.com> Date: Wed, 8 Aug 2018 18:16:53 +0800 Subject: [PATCH] eos statistic rest api --- fooltrader/api/esapi/esapi.py | 23 ++++++- fooltrader/botsamples/eos_statistic_bot.py | 26 ++++---- .../botsamples/eos_user_statistic_bot.py | 28 +++++---- fooltrader/domain/data/es_quote.py | 4 +- fooltrader/rest/__init__.py | 6 +- fooltrader/rest/controller/__init__.py | 1 + fooltrader/rest/{ => controller}/security.py | 0 .../rest/{ => controller}/subscription.py | 0 fooltrader/rest/controller/tech.py | 60 +++++++++++++++++++ fooltrader/rest/tech.py | 27 --------- fooltrader/settings.py | 2 +- fooltrader/utils/es_utils.py | 3 +- 12 files changed, 119 insertions(+), 61 deletions(-) create mode 100644 fooltrader/rest/controller/__init__.py rename fooltrader/rest/{ => controller}/security.py (100%) rename fooltrader/rest/{ => controller}/subscription.py (100%) create mode 100644 fooltrader/rest/controller/tech.py delete mode 100644 fooltrader/rest/tech.py diff --git a/fooltrader/api/esapi/esapi.py b/fooltrader/api/esapi/esapi.py index 4200a11..7f49c7f 100644 --- a/fooltrader/api/esapi/esapi.py +++ b/fooltrader/api/esapi/esapi.py @@ -8,7 +8,7 @@ from fooltrader.contract.data_contract import KDATA_STOCK_COL, KDATA_FUTURE_COL, KDATA_INDEX_COL, \ KDATA_COMMON_COL from fooltrader.contract.es_contract import get_es_kdata_index, get_cryptocurrency_user_statistic_index, \ - get_cryptocurrency_daily_user_statistic_index + get_cryptocurrency_daily_user_statistic_index, get_es_statistic_index from fooltrader.domain.business.es_subscription import PriceSubscription from fooltrader.utils.es_utils import es_resp_to_payload from fooltrader.utils.utils import to_time_str @@ -132,6 +132,27 @@ def es_get_kdata(security_item, exchange=None, the_date=None, start_date=None, e return es_resp_to_payload(resp, csv) +def es_get_statistic(security_item, the_date=None, start_date=None, end_date=None, level='day', + from_idx=0, size=500): + security_item = to_security_item(security_item) + + index = get_es_statistic_index(security_type=security_item['type'], exchange=security_item['exchange'], + level=level) + # 单日的日k线直接按id获取 + if level == 'day' and the_date: + doc_id = '{}_{}'.format(security_item['id'], to_time_str(the_date)) + return es_client.get_source(index=index, doc_type='doc', id=doc_id) + elif start_date and end_date: + s = Search(using=es_client, index=index, doc_type='doc') \ + .filter('term', code=security_item['code']) \ + .filter('range', timestamp={'gte': start_date, 'lte': end_date}) \ + .sort({"timestamp": {"order": "asc"}}) + + resp = s[from_idx:from_idx + size].execute() + + return es_resp_to_payload(resp) + + if __name__ == '__main__': print(es_get_kdata('300027', the_date='2017-09-04')) print(es_get_kdata('300027', the_date='2017-09-04', fields=['close'])) diff --git a/fooltrader/botsamples/eos_statistic_bot.py b/fooltrader/botsamples/eos_statistic_bot.py index 3e8e0c8..00ad15a 100644 --- a/fooltrader/botsamples/eos_statistic_bot.py +++ b/fooltrader/botsamples/eos_statistic_bot.py @@ -18,8 +18,8 @@ class EosStatisticBot(NotifyEventBot): - BIG_ORDER = 2000 - MIDDLE_ORDER = 500 + BIG_ORDER = 2000 * 10000 + MIDDLE_ORDER = 500 * 10000 def on_init(self): super().on_init() @@ -132,19 +132,19 @@ def generate_eos_daily_statistic(self): turnover = self.df['turnover'].sum() flow = (self.df['turnover'] * self.df['direction']).sum() - flowIn = self.df[self.df['direction'] == 1]['volume'].sum() - flowOut = self.df[self.df['direction'] == -1]['volume'].sum() + flowIn = self.df[self.df['direction'] == 1]['turnover'].sum() + flowOut = self.df[self.df['direction'] == -1]['turnover'].sum() - bigFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] >= self.BIG_ORDER)]['volume'].sum() - middleFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] >= self.MIDDLE_ORDER) & ( - self.df['volume'] < self.BIG_ORDER)]['volume'].sum() - smallFlowIn = self.df[(self.df['direction'] == 1) & (self.df['volume'] < self.MIDDLE_ORDER)]['volume'].sum() + bigFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] >= self.BIG_ORDER)]['turnover'].sum() + middleFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] >= self.MIDDLE_ORDER) & ( + self.df['turnover'] < self.BIG_ORDER)]['turnover'].sum() + smallFlowIn = self.df[(self.df['direction'] == 1) & (self.df['turnover'] < self.MIDDLE_ORDER)]['turnover'].sum() - bigFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] >= self.BIG_ORDER)]['volume'].sum() - middleFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] >= self.MIDDLE_ORDER) & ( - self.df['volume'] < self.BIG_ORDER)]['volume'].sum() - smallFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] < self.MIDDLE_ORDER)][ - 'volume'].sum() + bigFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] >= self.BIG_ORDER)]['turnover'].sum() + middleFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] >= self.MIDDLE_ORDER) & ( + self.df['turnover'] < self.BIG_ORDER)]['turnover'].sum() + smallFlowOut = self.df[(self.df['direction'] == -1) & (self.df['turnover'] < self.MIDDLE_ORDER)][ + 'turnover'].sum() self.update_statistic_doc(self.latest_statistic_record, {'volume': volume, 'turnover': turnover, diff --git a/fooltrader/botsamples/eos_user_statistic_bot.py b/fooltrader/botsamples/eos_user_statistic_bot.py index 2018947..e1c35ea 100644 --- a/fooltrader/botsamples/eos_user_statistic_bot.py +++ b/fooltrader/botsamples/eos_user_statistic_bot.py @@ -85,12 +85,12 @@ def on_event(self, event_item): self.df = pd.DataFrame(self.item_list) self.generate_user_statistic() - # print(self.es_actions[0:10]) - # if self.es_actions: - # resp = elasticsearch.helpers.bulk(es_client, self.es_actions) - # self.logger.info("index success:{} failed:{}".format(resp[0], len(resp[1]))) - # if resp[1]: - # self.logger.error("error:{}".format(resp[1])) + + if self.es_actions: + resp = elasticsearch.helpers.bulk(es_client, self.es_actions) + self.logger.info("index success:{} failed:{}".format(resp[0], len(resp[1]))) + if resp[1]: + self.logger.error("error:{}".format(resp[1])) self.init_new_computing_interval(event_item['timestamp']) self.es_actions = [] @@ -112,9 +112,9 @@ def update_statistic_doc(self, statistic_doc, append_record, updateTimestamp): statistic_doc[key] += float(the_value) else: statistic_doc[key] = float(the_value) - statistic_doc['updateTimestamp'] = self.last_mirco_time_str - statistic_doc.save(force=True) - # self.es_actions.append(statistic_doc.to_dict(include_meta=True)) + statistic_doc['updateTimestamp'] = updateTimestamp + # statistic_doc.save(force=True) + self.es_actions.append(statistic_doc.to_dict(include_meta=True)) def update_daily_user_statistic(self, user_id, record, update_timestamp): latest_user_daily_statistic = self.user_map_latest_user_daily_statistic.get(user_id) @@ -125,6 +125,7 @@ def update_daily_user_statistic(self, user_id, record, update_timestamp): latest_user_daily_statistic = EosUserStatistic( meta={'id': the_record['id'], 'index': daily_user_statistic_index_name}, **the_record) + self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic # ignore the user statistic has computed before if latest_user_daily_statistic and self.kdata_timestamp <= to_timestamp( @@ -138,11 +139,11 @@ def update_daily_user_statistic(self, user_id, record, update_timestamp): meta={'id': doc_id, 'index': daily_user_statistic_index_name}, id=doc_id, userId=user_id, - timestamp=to_time_str(self.last_day_time_str, time_fmt=TIME_FORMAT_MICRO), + timestamp=self.last_day_time_str, securityId=self.security_id, code=self.security_item['code'], name=self.security_item['name']) - self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic + self.user_map_latest_user_daily_statistic[user_id] = latest_user_daily_statistic # update user daily statistic self.update_statistic_doc(latest_user_daily_statistic, record, update_timestamp) @@ -156,6 +157,7 @@ def update_user_statistic(self, user_id, record, update_timestamp): if the_record: latest_user_statistic = EosUserStatistic(meta={'id': doc_id, 'index': user_statistic_index_name}, **the_record) + self.user_map_latest_user_statistic[user_id] = latest_user_statistic # ignore the user statistic has computed before if latest_user_statistic and self.kdata_timestamp <= to_timestamp( latest_user_statistic['updateTimestamp']): @@ -165,11 +167,11 @@ def update_user_statistic(self, user_id, record, update_timestamp): latest_user_statistic = EosUserStatistic(meta={'id': doc_id, 'index': user_statistic_index_name}, id=doc_id, userId=user_id, - timestamp=to_time_str(self.last_day_time_str, time_fmt=TIME_FORMAT_MICRO), + timestamp=self.last_day_time_str, securityId=self.security_id, code=self.security_item['code'], name=self.security_item['name']) - self.user_map_latest_user_statistic[user_id] = latest_user_statistic + self.user_map_latest_user_statistic[user_id] = latest_user_statistic # update user statistic self.update_statistic_doc(latest_user_statistic, record, update_timestamp) diff --git a/fooltrader/domain/data/es_quote.py b/fooltrader/domain/data/es_quote.py index 42cd8a9..bb22cb3 100644 --- a/fooltrader/domain/data/es_quote.py +++ b/fooltrader/domain/data/es_quote.py @@ -168,8 +168,8 @@ class Meta: class EosUserStatistic(BaseDocType): id = Keyword() userId = Keyword() - timestamp = Date() - updateTimestamp = Date() + timestamp = Date(format="yyyyMMdd HHmmss.SSS||yyyy-MM-dd||epoch_millis") + updateTimestamp = Date(format="yyyyMMdd HHmmss.SSS||yyyy-MM-dd||epoch_millis") securityId = Keyword() code = Keyword() name = Keyword() diff --git a/fooltrader/rest/__init__.py b/fooltrader/rest/__init__.py index 90e42dd..1493c44 100644 --- a/fooltrader/rest/__init__.py +++ b/fooltrader/rest/__init__.py @@ -15,6 +15,6 @@ app.config.from_object(Config(root_path=FOOLTRADER_STORE_PATH)) app.config['JSON_AS_ASCII'] = False -from fooltrader.rest.security import * -from fooltrader.rest.subscription import * -from fooltrader.rest.tech import * +from fooltrader.rest.controller.security import * +from fooltrader.rest.controller.subscription import * +from fooltrader.rest.controller.tech import * diff --git a/fooltrader/rest/controller/__init__.py b/fooltrader/rest/controller/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/fooltrader/rest/controller/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/fooltrader/rest/security.py b/fooltrader/rest/controller/security.py similarity index 100% rename from fooltrader/rest/security.py rename to fooltrader/rest/controller/security.py diff --git a/fooltrader/rest/subscription.py b/fooltrader/rest/controller/subscription.py similarity index 100% rename from fooltrader/rest/subscription.py rename to fooltrader/rest/controller/subscription.py diff --git a/fooltrader/rest/controller/tech.py b/fooltrader/rest/controller/tech.py new file mode 100644 index 0000000..c252f6d --- /dev/null +++ b/fooltrader/rest/controller/tech.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +from flask import request + +from fooltrader.api.esapi import esapi +from fooltrader.rest import app +from fooltrader.rest.common import success + + +@app.route('/tech/kdata/', methods=['GET']) +def get_kdata(securityid): + the_date = request.args.get('the_date') + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + level = request.args.get('level', 'day') + + fields = request.args.get('fields') + if not fields: + fields = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + + from_idx = request.args.get('from_idx', 0) + size = request.args.get('size', 500) + + result = esapi.es_get_kdata(security_item=securityid, the_date=the_date, start_date=start_date, + end_date=end_date, fields=fields, csv=True, + level=level, from_idx=int(from_idx), size=int(size)) + + return success(result) + + +@app.route('/tech/statistic/', methods=['GET']) +def get_statistic(securityid): + the_date = request.args.get('the_date') + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + level = request.args.get('level', 'day') + + from_idx = request.args.get('from_idx', 0) + size = request.args.get('size', 500) + + result = esapi.es_get_statistic(security_item=securityid, the_date=the_date, start_date=start_date, + end_date=end_date, level=level, from_idx=int(from_idx), size=int(size)) + + return success(result) + + +@app.route('/tech/user_statistic/', defaults={'user_id': None}, methods=['GET']) +@app.route('/tech/user_statistic//', methods=['GET']) +def get_user_statistic(main_chain, user_id): + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + security_id = request.args.get('security_id', 'cryptocurrency_contract_RAM-EOS') + + from_idx = request.args.get('from_idx', 0) + size = request.args.get('size', 100) + + result = esapi.es_get_user_statistic(main_chain=main_chain, security_id=security_id, user_id=user_id, + start_date=start_date, + end_date=end_date, from_idx=int(from_idx), size=int(size)) + + return success(result) diff --git a/fooltrader/rest/tech.py b/fooltrader/rest/tech.py deleted file mode 100644 index b08cc76..0000000 --- a/fooltrader/rest/tech.py +++ /dev/null @@ -1,27 +0,0 @@ -# -*- coding: utf-8 -*- -from flask import request - -from fooltrader.api.esapi import esapi -from fooltrader.rest import app -from fooltrader.rest.common import success - - -@app.route('/kdata/', methods=['GET']) -def get_kdata(securityid): - the_date = request.args.get('the_date') - start_date = request.args.get('start_date') - end_date = request.args.get('end_date') - level = request.args.get('level', 'day') - - fields = request.args.get('fields') - if not fields: - fields = ['open', 'high', 'low', 'close', 'volume'] - - from_idx = request.args.get('from_idx', 0) - size = request.args.get('size', 10) - - result = esapi.es_get_kdata(security_item=securityid, the_date=the_date, start_date=start_date, - end_date=end_date, fields=fields, csv=True, - level=level, from_idx=int(from_idx), size=int(size)) - - return success(result) diff --git a/fooltrader/settings.py b/fooltrader/settings.py index 8aada07..6458fa5 100644 --- a/fooltrader/settings.py +++ b/fooltrader/settings.py @@ -123,7 +123,7 @@ g_http_proxy_items = [] g_socks2http_proxy_items = {} -TIME_FORMAT_MICRO = '%Y%m%dT%H%M%S.%f' +TIME_FORMAT_MICRO = "%Y%m%d %H%M%S.%f" TIME_FORMAT_SEC = '%Y-%m-%d %H:%M:%S' diff --git a/fooltrader/utils/es_utils.py b/fooltrader/utils/es_utils.py index 4f049c6..a078174 100644 --- a/fooltrader/utils/es_utils.py +++ b/fooltrader/utils/es_utils.py @@ -87,7 +87,8 @@ def es_resp_to_payload(resp, csv=False): datas = [hit['_source'].to_dict() for hit in resp['hits']['hits']] if csv: - datas = [[data['open'], data['high'], data['low'], data['close'], data['volume']] for data in datas] + datas = [[data['timestamp'], data['open'], data['high'], data['low'], data['close'], data['volume']] for data in + datas] return { 'total': resp['hits']['total'],