Skip to content

Commit

Permalink
eos user statistic
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Aug 7, 2018
1 parent 779c1e7 commit 3c1295f
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 123 deletions.
49 changes: 48 additions & 1 deletion fooltrader/api/esapi/esapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from fooltrader.api.technical import to_security_item
from fooltrader.contract.data_contract import KDATA_STOCK_COL, KDATA_FUTURE_COL, KDATA_INDEX_COL, \
KDATA_COMMON_COL
from fooltrader.contract.es_contract import get_es_kdata_index
from fooltrader.contract.es_contract import get_es_kdata_index, get_cryptocurrency_user_statistic_index, \
get_cryptocurrency_daily_user_statistic_index
from fooltrader.domain.business.es_subscription import PriceSubscription
from fooltrader.utils.es_utils import es_resp_to_payload
from fooltrader.utils.utils import to_time_str
Expand All @@ -24,6 +25,52 @@ def es_get_subscription(user_id=None, security_id=None, from_idx=0, size=500):
return es_resp_to_payload(resp)


def es_get_latest_daily_user_statistic(user_id, main_chain='eos', security_id='cryptocurrency_contract_RAM-EOS'):
index = get_cryptocurrency_daily_user_statistic_index(main_chain=main_chain)

s = Search(using=es_client, index=index, doc_type='doc') \
.filter('term', userId=user_id) \
.filter('term', securityId=security_id)

s = s.sort({"timestamp": {"order": "desc"}})
resp = s[0:1].execute()

datas = [hit['_source'].to_dict() for hit in resp['hits']['hits']]
if datas:
return datas[0]
return None


def es_get_user_statistic(main_chain='eos', security_id='cryptocurrency_contract_RAM-EOS', user_id=None,
start_date=None, end_date=None, from_idx=0, size=100, order='volume'):
index = get_cryptocurrency_user_statistic_index(main_chain=main_chain)

# get the user time range statistic
if user_id and start_date and end_date:
index = get_cryptocurrency_daily_user_statistic_index(main_chain=main_chain)

s = Search(using=es_client, index=index, doc_type='doc') \
.filter('term', userId=user_id) \
.filter('term', securityId=security_id) \
.filter('range', timestamp={'gte': start_date, 'lte': end_date})

s = s.sort({"timestamp": {"order": "asc"}})
# get the user latest statistic
elif user_id:
doc_id = '{}_{}'.format(user_id, security_id)
return es_client.get_source(index=index, doc_type='doc', id=doc_id, ignore=404)
# get top 100 user latest statistic
else:
s = Search(using=es_client, index=index, doc_type='doc') \
.filter('term', securityId=security_id)

s = s.sort({order: {"order": "desc"}})

resp = s[from_idx:from_idx + size].execute()

return es_resp_to_payload(resp)


def es_get_kdata(security_item, exchange=None, the_date=None, start_date=None, end_date=None, level='day', fields=None,
from_idx=0, size=500, csv=False):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
# -*- coding: utf-8 -*-
from datetime import timedelta
from datetime import timedelta, datetime

import pandas as pd

from fooltrader.bot.bot import NotifyEventBot
from fooltrader.contract.es_contract import get_es_kdata_index, get_es_statistic_index, \
get_cryptocurrency_user_statistic
from fooltrader.domain.data.es_quote import CommonKData, CommonStatistic, EosUserStatistic
from fooltrader.contract.es_contract import get_es_kdata_index, get_es_statistic_index
from fooltrader.domain.data.es_quote import CommonKData, CommonStatistic
from fooltrader.settings import TIME_FORMAT_MICRO
from fooltrader.utils.es_utils import es_get_latest_timestamp, es_get_latest_record, es_index_mapping
from fooltrader.utils.utils import to_timestamp, to_time_str, fill_doc_type, is_same_date, is_same_time

statistic_index_name = get_es_statistic_index(security_type='cryptocurrency', exchange='contract')
user_statistic_index_name = get_cryptocurrency_user_statistic()
kdata_index_name = get_es_kdata_index(security_type='cryptocurrency', exchange='contract', level='1min')

es_index_mapping(statistic_index_name, CommonStatistic)
es_index_mapping(user_statistic_index_name, EosUserStatistic)
es_index_mapping(kdata_index_name, CommonKData)


class StatisticBot(NotifyEventBot):
class EosStatisticBot(NotifyEventBot):
BIG_ORDER = 2000
MIDDLE_ORDER = 500

Expand All @@ -36,7 +33,7 @@ def on_init(self):
# get latest kdata timestamp
latest_kdata_timestamp = es_get_latest_timestamp(index=kdata_index_name, query=query)

# get latest statistic timestamp
# get latest eos statistic timestamp
latest_statistic_record = es_get_latest_record(index=statistic_index_name,
query=query, time_field='updateTimestamp')
if latest_statistic_record:
Expand All @@ -51,107 +48,82 @@ def on_init(self):
else:
self.latest_statistic_record = None

# get latest user statistic timestamp
user_statistic_index_name = get_cryptocurrency_user_statistic()
latest_eos_user_statistic_record = es_get_latest_record(index=user_statistic_index_name,
query=query, time_field='updateTimestamp')

if latest_eos_user_statistic_record:
self.latest_eos_user_statistic_record = EosUserStatistic(
meta={'id': latest_eos_user_statistic_record['id'], 'index': user_statistic_index_name},
**latest_eos_user_statistic_record)

if not is_same_time(latest_kdata_timestamp, self.latest_eos_user_statistic_record['updateTimestamp']):
self.logger.warning(
"latest_kdata_timestamp:{},latest_eos_user_statistic_timestamp:{}".format(latest_kdata_timestamp,
self.latest_statistic_record[
'updateTimestamp']))
else:
self.latest_eos_user_statistic_record = None

if latest_kdata_timestamp and self.latest_eos_user_statistic_record and self.latest_eos_user_statistic_record:
if latest_kdata_timestamp and self.latest_statistic_record:
self.start_timestamp = min(latest_kdata_timestamp,
to_timestamp(self.latest_eos_user_statistic_record['updateTimestamp']),
to_timestamp(self.latest_eos_user_statistic_record['updateTimestamp']))
to_timestamp(self.latest_statistic_record['updateTimestamp']))

def after_init(self):
super().after_init()
if not self.start_timestamp:
self.start_timestamp = to_timestamp(self.security_item['listDate'])
# the last timestamp for the computing interval
self.last_timestamp = None
self.last_day_time_str = None
self.last_mirco_time_str = None

self.df = pd.DataFrame()
self.item_list = []

self.computing_start = None

def init_new_computing_interval(self, event_timestamp):
self.last_timestamp = to_timestamp(event_timestamp)
self.kdata_timestamp = self.last_timestamp + timedelta(seconds=-self.last_timestamp.second,
microseconds=-self.last_timestamp.microsecond)

self.last_day_time_str = to_time_str(self.kdata_timestamp)
self.last_mirco_time_str = to_time_str(self.kdata_timestamp, time_fmt=TIME_FORMAT_MICRO)

def on_event(self, event_item):
if not self.computing_start:
self.computing_start = datetime.now()
if not self.last_timestamp:
self.last_timestamp = to_timestamp(event_item['timestamp'])
self.init_new_computing_interval(event_item['timestamp'])

current_timestamp = to_timestamp(event_item['timestamp'])

# calculating last minute
if current_timestamp.minute != self.last_timestamp.minute:
self.df.fillna(0, inplace=True)
self.generate_kdata()
self.generate_statistic()

self.last_timestamp = current_timestamp
self.df = pd.DataFrame()

self.df = self.df.append(event_item, ignore_index=True)

# class CommonStatistic(BaseDocType):
# id = Keyword()
# timestamp = Date()
# securityId = Keyword()
# code = Keyword()
# name = Keyword()
#
# volume = Float()
# turnover = Float()
# flow = Float()
# flowIn = Float()
# flowOut = Float()
# bigFlowIn = Float()
# middleFlowIn = Float()
# smallFlowIn = Float()
# bigFlowOut = Float()
# middleFlowOut = Float()
# smallFlowOut = Float()
#
# class Meta:
# doc_type = 'doc'
# all = MetaField(enabled=False)
# dynamic = MetaField('strict')

def update_statistic_doc(self, append_record, updateTimestamp):
self.df = pd.DataFrame(self.item_list)

self.generate_1min_kdata()
self.generate_eos_daily_statistic()

self.init_new_computing_interval(event_item['timestamp'])
self.item_list = []

self.logger.info("using computing time:{}".format(datetime.now() - self.computing_start))
self.computing_start = datetime.now()

self.item_list.append(event_item)

def update_statistic_doc(self, statistic_doc, append_record, updateTimestamp):
for key in append_record.keys():
if pd.isna(append_record[key]):
the_value = 0
else:
the_value = append_record[key]

if (key in self.latest_statistic_record) and (self.latest_statistic_record[key] != 0):
self.latest_statistic_record[key] += the_value
if key in statistic_doc:
statistic_doc[key] += float(the_value)
else:
self.latest_statistic_record[key] = the_value
self.latest_statistic_record['updateTimestamp'] = updateTimestamp
self.latest_statistic_record.save(force=True)
statistic_doc[key] = float(the_value)
statistic_doc['updateTimestamp'] = updateTimestamp
statistic_doc.save(force=True)

def generate_statistic(self):
kdata_timestamp = self.last_timestamp + timedelta(seconds=-self.last_timestamp.second,
microseconds=-self.last_timestamp.microsecond)

if self.latest_statistic_record and kdata_timestamp < to_timestamp(
def generate_eos_daily_statistic(self):
# ignore the statistic has computed before
if self.latest_statistic_record and self.kdata_timestamp <= to_timestamp(
self.latest_statistic_record['updateTimestamp']):
return

# update the statistic
if (not self.latest_statistic_record) or (not is_same_date(self.latest_statistic_record['timestamp'],
self.df['timestamp'][0])):
timestamp = to_time_str(self.last_timestamp)
doc_id = "{}_{}".format(self.security_id, timestamp)
doc_id = "{}_{}".format(self.security_id, self.last_day_time_str)
self.latest_statistic_record = CommonStatistic(meta={'id': doc_id, 'index': statistic_index_name},
id=doc_id,
timestamp=timestamp,
timestamp=self.last_day_time_str,
securityId=self.security_id,
code=self.security_item['code'],
name=self.security_item['name'])
Expand All @@ -174,23 +146,25 @@ def generate_statistic(self):
smallFlowOut = self.df[(self.df['direction'] == -1) & (self.df['volume'] < self.MIDDLE_ORDER)][
'volume'].sum()

time_str = to_time_str(kdata_timestamp, time_fmt=TIME_FORMAT_MICRO)

self.update_statistic_doc({
'volume': volume,
'turnover': turnover,
'flow': flow,
'flowIn': flowIn,
'flowOut': flowOut,
'bigFlowIn': bigFlowIn,
'middleFlowIn': middleFlowIn,
'smallFlowIn': smallFlowIn,
'bigFlowOut': bigFlowOut,
'middleFlowOut': middleFlowOut,
'smallFlowOut': smallFlowOut
}, updateTimestamp=time_str)

def generate_kdata(self):
self.update_statistic_doc(self.latest_statistic_record, {'volume': volume,
'turnover': turnover,
'flow': flow,
'flowIn': flowIn,
'flowOut': flowOut,
'bigFlowIn': bigFlowIn,
'middleFlowIn': middleFlowIn,
'smallFlowIn': smallFlowIn,
'bigFlowOut': bigFlowOut,
'middleFlowOut': middleFlowOut,
'smallFlowOut': smallFlowOut
}, updateTimestamp=self.last_mirco_time_str)

def generate_1min_kdata(self):
doc_id = "{}_{}".format(self.security_id, self.last_mirco_time_str)
kdata_doc = CommonKData(meta={'id': doc_id, 'index': kdata_index_name}, id=doc_id)
if kdata_doc.exist(index=kdata_index_name):
return

se_price = self.df['price']
high = se_price.max()
low = se_price.min()
Expand All @@ -199,31 +173,25 @@ def generate_kdata(self):
volume = self.df['volume'].sum()
turnover = self.df['turnover'].sum()

kdata_timestamp = self.last_timestamp + timedelta(seconds=-self.last_timestamp.second,
microseconds=-self.last_timestamp.microsecond)
time_str = to_time_str(kdata_timestamp, time_fmt=TIME_FORMAT_MICRO)
doc_id = "{}_{}".format(self.security_id, time_str)
kdata_json = {
'id': doc_id,
'timestamp': time_str,
'updateTimestamp': time_str,
'timestamp': self.last_mirco_time_str,
'updateTimestamp': self.last_mirco_time_str,
'securityId': self.security_item['id'],
'code': self.security_item['code'],
'name': self.security_item['name'],
'open': open,
'high': high,
'low': low,
'close': close,
'volume': volume,
'turnover': turnover
'open': float(open),
'high': float(high),
'low': float(low),
'close': float(close),
'volume': float(volume),
'turnover': float(turnover)
}

kdata_doc = CommonKData(meta={'id': doc_id, 'index': kdata_index_name})

fill_doc_type(kdata_doc, kdata_json)

kdata_doc.save(force=True)


if __name__ == '__main__':
StatisticBot().run()
EosStatisticBot().run()
Loading

0 comments on commit 3c1295f

Please sign in to comment.