From 200d1c7fd09757b54296ca8ad130acb0b6c9028c Mon Sep 17 00:00:00 2001 From: foolcage <5533061@qq.com> Date: Mon, 6 Aug 2018 13:25:05 +0800 Subject: [PATCH] generating RAM/EOS kdata --- fooltrader/bot/bot.py | 15 ++-- fooltrader/botsamples/statistic_bot.py | 96 +++++++++++++++++++++++++ fooltrader/connector/kafka_connector.py | 4 +- fooltrader/contract/es_contract.py | 10 +++ fooltrader/datasource/eos.py | 4 +- fooltrader/domain/data/es_quote.py | 22 ++++++ fooltrader/settings.py | 7 +- 7 files changed, 141 insertions(+), 17 deletions(-) create mode 100644 fooltrader/botsamples/statistic_bot.py diff --git a/fooltrader/bot/bot.py b/fooltrader/bot/bot.py index 35a4b75..88982f6 100644 --- a/fooltrader/bot/bot.py +++ b/fooltrader/bot/bot.py @@ -32,22 +32,19 @@ def on_event(self, event_item): self.logger.info("got event:{}".format(event_item)) def __init__(self, security_id=None): - super.__init__() - - assert security_id is not None + super().__init__() self.security_id = security_id - - self.security_item = to_security_item(self.security_item) - assert security_id is not None - - super.__init__() - self.start_timestamp = None self.end_timestamp = None # setup the user custom settings self.on_init() + assert self.security_id is not None + + self.security_item = to_security_item(self.security_id) + assert self.security_item is not None + self._threads = [] self.quote_topic = get_kafka_tick_topic(security_id=self.security_id) diff --git a/fooltrader/botsamples/statistic_bot.py b/fooltrader/botsamples/statistic_bot.py new file mode 100644 index 0000000..879c32b --- /dev/null +++ b/fooltrader/botsamples/statistic_bot.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +from datetime import timedelta + +import pandas as pd + +from fooltrader.bot.bot import NotifyEventBot +from fooltrader.contract.es_contract import get_es_kdata_index, get_es_statistic_index +from fooltrader.domain.data.es_quote import CommonKData +from fooltrader.settings import TIME_FORMAT_MICRO +from fooltrader.utils.es_utils import es_get_latest_timestamp +from fooltrader.utils.utils import to_timestamp, to_time_str, fill_doc_type + + +class StatisticBot(NotifyEventBot): + + def on_init(self): + super().on_init() + self.security_id = 'cryptocurrency_contract_RAM-EOS' + + query = { + "term": {"securityId": ""} + } + query["term"]["securityId"] = self.security_id + + # get latest kdata timestamp + self.kdata_index_name = get_es_kdata_index(security_type='cryptocurrency', exchange='contract', level='1min') + latest_kdata_timestamp = es_get_latest_timestamp(index=self.kdata_index_name, query=query) + + # get latest statistic timestamp + self.statistic_index_name = get_es_statistic_index(security_type='cryptocurrency', exchange='contract') + latest_statistic_timestamp = es_get_latest_timestamp(index=self.statistic_index_name, query=query) + + if latest_kdata_timestamp != latest_statistic_timestamp: + self.logger.warning("latest_kdata_timestamp:{},latest_statistic_timestamp:{}".format(latest_kdata_timestamp, + latest_statistic_timestamp)) + + if latest_kdata_timestamp and latest_statistic_timestamp: + self.start_timestamp = min(latest_kdata_timestamp, latest_statistic_timestamp) + + if not self.start_timestamp: + self.start_timestamp = to_timestamp('2018-06-09 11:55:00') + + def after_init(self): + super().after_init() + self.last_timestamp = None + self.df = pd.DataFrame() + + def on_event(self, event_item): + if not self.last_timestamp: + self.last_timestamp = to_timestamp(event_item['timestamp']) + + current_timestamp = to_timestamp(event_item['timestamp']) + + # calculating last minute + if current_timestamp.minute != self.last_timestamp.minute: + # generating kdata + se_price = self.df['price'] + high = se_price.max() + low = se_price.min() + open = se_price[0] + close = se_price[len(se_price) - 1] + volume = self.df['volume'].sum() + turnover = self.df['turnover'].sum() + + kdata_timestamp = self.last_timestamp + timedelta(minutes=1, seconds=-self.last_timestamp.second, + microseconds=-self.last_timestamp.microsecond) + time_str = to_time_str(kdata_timestamp, time_fmt=TIME_FORMAT_MICRO) + doc_id = "{}_{}".format(self.security_id, time_str) + kdata_json = { + 'id': doc_id, + 'timestamp': time_str, + 'securityId': self.security_item['id'], + 'code': self.security_item['code'], + 'name': self.security_item['name'], + 'open': open, + 'high': high, + 'low': low, + 'close': close, + 'volume': volume, + 'turnover': turnover + } + + kdata_doc = CommonKData(meta={'id': doc_id, 'index': self.kdata_index_name}) + + fill_doc_type(kdata_doc, kdata_json) + + kdata_doc.save(force=True) + + self.last_timestamp = current_timestamp + self.df = pd.DataFrame() + + self.df = self.df.append(event_item, ignore_index=True) + + +if __name__ == '__main__': + StatisticBot().run() diff --git a/fooltrader/connector/kafka_connector.py b/fooltrader/connector/kafka_connector.py index f0b4b0e..897bf50 100644 --- a/fooltrader/connector/kafka_connector.py +++ b/fooltrader/connector/kafka_connector.py @@ -33,8 +33,8 @@ def _tick_to_kafka(security_item): the_json = tick_item.to_json(force_ascii=False) producer.send(get_kafka_tick_topic(security_item['id']), bytes(the_json, encoding='utf8'), - timestamp_ms=int(datetime.datetime.strptime(tick_item['timestamp'], - TIME_FORMAT_SEC).timestamp())) + timestamp_ms=int(1000 * datetime.datetime.strptime(tick_item['timestamp'], + TIME_FORMAT_SEC).timestamp())) logger.debug("tick_to_kafka {}".format(the_json)) diff --git a/fooltrader/contract/es_contract.py b/fooltrader/contract/es_contract.py index 6ad1c99..99024f1 100644 --- a/fooltrader/contract/es_contract.py +++ b/fooltrader/contract/es_contract.py @@ -13,3 +13,13 @@ def get_es_kdata_index(security_type='stock', exchange='sh', level='day'): def get_es_finance_event_index(event_type='finance_forecast'): return '{}_event'.format(event_type) + + +def get_es_statistic_index(security_type='stock', exchange='sh', level='day'): + # 按 类型_国家_级别 来索引 + if exchange in ['sh', 'sz']: + return '{}_{}_{}_statistic'.format(security_type, 'china', level) + elif exchange in ['nasdaq', 'amex', 'nyse']: + return '{}_{}_{}_statistic'.format(security_type, 'usa', level) + else: + return '{}_{}_{}_statistic'.format(security_type, exchange, level) diff --git a/fooltrader/datasource/eos.py b/fooltrader/datasource/eos.py index c1a6408..a2dea6c 100644 --- a/fooltrader/datasource/eos.py +++ b/fooltrader/datasource/eos.py @@ -32,7 +32,7 @@ def to_tick(item): return { 'timestamp': to_time_str(item['block_time'], time_fmt=TIME_FORMAT_MICRO), - 'securityId': 'cryptocurrency_contact_RAM-EOS', + 'securityId': 'cryptocurrency_contract_RAM-EOS', 'code': 'RAM-EOS', 'price': item['bytes'] / item['price'], 'direction': direction, @@ -59,7 +59,7 @@ def eos_ram_to_kafka(): logger.info("one record:{}".format(one_record)) - security_id = 'cryptocurrency_contact_RAM-EOS' + security_id = 'cryptocurrency_contract_RAM-EOS' latest_timestamp, latest_order = get_latest_timestamp_order(security_id) diff --git a/fooltrader/domain/data/es_quote.py b/fooltrader/domain/data/es_quote.py index 0b0e3b3..9fcb2ed 100644 --- a/fooltrader/domain/data/es_quote.py +++ b/fooltrader/domain/data/es_quote.py @@ -133,6 +133,7 @@ class CryptoCurrencyKData(BaseDocType): high = Float() low = Float() volume = Float() + turnover = Float() preClose = Float() change = Float() changePct = Float() @@ -143,6 +144,26 @@ class Meta: dynamic = MetaField('strict') +class CommonKData(BaseDocType): + id = Keyword() + timestamp = Date() + securityId = Keyword() + code = Keyword() + + name = Keyword() + open = Float() + close = Float() + high = Float() + low = Float() + volume = Float() + turnover = Float() + + class Meta: + doc_type = 'doc' + all = MetaField(enabled=False) + dynamic = MetaField('strict') + + # 股票指数K线 class IndexKData(BaseDocType): id = Keyword() @@ -168,3 +189,4 @@ class IndexKData(BaseDocType): class Meta: all = MetaField(enabled=False) doc_type = 'doc' + dynamic = MetaField('strict') diff --git a/fooltrader/settings.py b/fooltrader/settings.py index 7ede5dd..8aada07 100644 --- a/fooltrader/settings.py +++ b/fooltrader/settings.py @@ -123,15 +123,14 @@ g_http_proxy_items = [] g_socks2http_proxy_items = {} -TIME_FORMAT_MICRO = '%Y-%m-%d %H:%M:%S.%f' +TIME_FORMAT_MICRO = '%Y%m%dT%H%M%S.%f' TIME_FORMAT_SEC = '%Y-%m-%d %H:%M:%S' TIME_FORMAT_DAY = '%Y-%m-%d' -ES_HOSTS = ['172.16.92.200:9200'] -# ES_HOSTS = ['localhost:9200'] - +# ES_HOSTS = ['172.16.92.200:9200'] +ES_HOSTS = ['localhost:9200'] # the action account settings SMTP_HOST = 'smtpdm.aliyun.com'