Skip to content

Commit

Permalink
generating RAM/EOS kdata
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Aug 6, 2018
1 parent c7b3540 commit 200d1c7
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 17 deletions.
15 changes: 6 additions & 9 deletions fooltrader/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,19 @@ def on_event(self, event_item):
self.logger.info("got event:{}".format(event_item))

def __init__(self, security_id=None):
super.__init__()

assert security_id is not None
super().__init__()
self.security_id = security_id

self.security_item = to_security_item(self.security_item)
assert security_id is not None

super.__init__()

self.start_timestamp = None
self.end_timestamp = None

# setup the user custom settings
self.on_init()

assert self.security_id is not None

self.security_item = to_security_item(self.security_id)
assert self.security_item is not None

self._threads = []

self.quote_topic = get_kafka_tick_topic(security_id=self.security_id)
Expand Down
96 changes: 96 additions & 0 deletions fooltrader/botsamples/statistic_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
from datetime import timedelta

import pandas as pd

from fooltrader.bot.bot import NotifyEventBot
from fooltrader.contract.es_contract import get_es_kdata_index, get_es_statistic_index
from fooltrader.domain.data.es_quote import CommonKData
from fooltrader.settings import TIME_FORMAT_MICRO
from fooltrader.utils.es_utils import es_get_latest_timestamp
from fooltrader.utils.utils import to_timestamp, to_time_str, fill_doc_type


class StatisticBot(NotifyEventBot):

def on_init(self):
super().on_init()
self.security_id = 'cryptocurrency_contract_RAM-EOS'

query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = self.security_id

# get latest kdata timestamp
self.kdata_index_name = get_es_kdata_index(security_type='cryptocurrency', exchange='contract', level='1min')
latest_kdata_timestamp = es_get_latest_timestamp(index=self.kdata_index_name, query=query)

# get latest statistic timestamp
self.statistic_index_name = get_es_statistic_index(security_type='cryptocurrency', exchange='contract')
latest_statistic_timestamp = es_get_latest_timestamp(index=self.statistic_index_name, query=query)

if latest_kdata_timestamp != latest_statistic_timestamp:
self.logger.warning("latest_kdata_timestamp:{},latest_statistic_timestamp:{}".format(latest_kdata_timestamp,
latest_statistic_timestamp))

if latest_kdata_timestamp and latest_statistic_timestamp:
self.start_timestamp = min(latest_kdata_timestamp, latest_statistic_timestamp)

if not self.start_timestamp:
self.start_timestamp = to_timestamp('2018-06-09 11:55:00')

def after_init(self):
super().after_init()
self.last_timestamp = None
self.df = pd.DataFrame()

def on_event(self, event_item):
if not self.last_timestamp:
self.last_timestamp = to_timestamp(event_item['timestamp'])

current_timestamp = to_timestamp(event_item['timestamp'])

# calculating last minute
if current_timestamp.minute != self.last_timestamp.minute:
# generating kdata
se_price = self.df['price']
high = se_price.max()
low = se_price.min()
open = se_price[0]
close = se_price[len(se_price) - 1]
volume = self.df['volume'].sum()
turnover = self.df['turnover'].sum()

kdata_timestamp = self.last_timestamp + timedelta(minutes=1, seconds=-self.last_timestamp.second,
microseconds=-self.last_timestamp.microsecond)
time_str = to_time_str(kdata_timestamp, time_fmt=TIME_FORMAT_MICRO)
doc_id = "{}_{}".format(self.security_id, time_str)
kdata_json = {
'id': doc_id,
'timestamp': time_str,
'securityId': self.security_item['id'],
'code': self.security_item['code'],
'name': self.security_item['name'],
'open': open,
'high': high,
'low': low,
'close': close,
'volume': volume,
'turnover': turnover
}

kdata_doc = CommonKData(meta={'id': doc_id, 'index': self.kdata_index_name})

fill_doc_type(kdata_doc, kdata_json)

kdata_doc.save(force=True)

self.last_timestamp = current_timestamp
self.df = pd.DataFrame()

self.df = self.df.append(event_item, ignore_index=True)


if __name__ == '__main__':
StatisticBot().run()
4 changes: 2 additions & 2 deletions fooltrader/connector/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def _tick_to_kafka(security_item):
the_json = tick_item.to_json(force_ascii=False)
producer.send(get_kafka_tick_topic(security_item['id']),
bytes(the_json, encoding='utf8'),
timestamp_ms=int(datetime.datetime.strptime(tick_item['timestamp'],
TIME_FORMAT_SEC).timestamp()))
timestamp_ms=int(1000 * datetime.datetime.strptime(tick_item['timestamp'],
TIME_FORMAT_SEC).timestamp()))
logger.debug("tick_to_kafka {}".format(the_json))


Expand Down
10 changes: 10 additions & 0 deletions fooltrader/contract/es_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ def get_es_kdata_index(security_type='stock', exchange='sh', level='day'):

def get_es_finance_event_index(event_type='finance_forecast'):
return '{}_event'.format(event_type)


def get_es_statistic_index(security_type='stock', exchange='sh', level='day'):
# 按 类型_国家_级别 来索引
if exchange in ['sh', 'sz']:
return '{}_{}_{}_statistic'.format(security_type, 'china', level)
elif exchange in ['nasdaq', 'amex', 'nyse']:
return '{}_{}_{}_statistic'.format(security_type, 'usa', level)
else:
return '{}_{}_{}_statistic'.format(security_type, exchange, level)
4 changes: 2 additions & 2 deletions fooltrader/datasource/eos.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def to_tick(item):

return {
'timestamp': to_time_str(item['block_time'], time_fmt=TIME_FORMAT_MICRO),
'securityId': 'cryptocurrency_contact_RAM-EOS',
'securityId': 'cryptocurrency_contract_RAM-EOS',
'code': 'RAM-EOS',
'price': item['bytes'] / item['price'],
'direction': direction,
Expand All @@ -59,7 +59,7 @@ def eos_ram_to_kafka():

logger.info("one record:{}".format(one_record))

security_id = 'cryptocurrency_contact_RAM-EOS'
security_id = 'cryptocurrency_contract_RAM-EOS'

latest_timestamp, latest_order = get_latest_timestamp_order(security_id)

Expand Down
22 changes: 22 additions & 0 deletions fooltrader/domain/data/es_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class CryptoCurrencyKData(BaseDocType):
high = Float()
low = Float()
volume = Float()
turnover = Float()
preClose = Float()
change = Float()
changePct = Float()
Expand All @@ -143,6 +144,26 @@ class Meta:
dynamic = MetaField('strict')


class CommonKData(BaseDocType):
id = Keyword()
timestamp = Date()
securityId = Keyword()
code = Keyword()

name = Keyword()
open = Float()
close = Float()
high = Float()
low = Float()
volume = Float()
turnover = Float()

class Meta:
doc_type = 'doc'
all = MetaField(enabled=False)
dynamic = MetaField('strict')


# 股票指数K线
class IndexKData(BaseDocType):
id = Keyword()
Expand All @@ -168,3 +189,4 @@ class IndexKData(BaseDocType):
class Meta:
all = MetaField(enabled=False)
doc_type = 'doc'
dynamic = MetaField('strict')
7 changes: 3 additions & 4 deletions fooltrader/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,14 @@
g_http_proxy_items = []
g_socks2http_proxy_items = {}

TIME_FORMAT_MICRO = '%Y-%m-%d %H:%M:%S.%f'
TIME_FORMAT_MICRO = '%Y%m%dT%H%M%S.%f'

TIME_FORMAT_SEC = '%Y-%m-%d %H:%M:%S'

TIME_FORMAT_DAY = '%Y-%m-%d'

ES_HOSTS = ['172.16.92.200:9200']
# ES_HOSTS = ['localhost:9200']

# ES_HOSTS = ['172.16.92.200:9200']
ES_HOSTS = ['localhost:9200']

# the action account settings
SMTP_HOST = 'smtpdm.aliyun.com'
Expand Down

0 comments on commit 200d1c7

Please sign in to comment.