Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Jul 24, 2018
1 parent 8354df1 commit a74c4c5
Showing 1 changed file with 14 additions and 33 deletions.
47 changes: 14 additions & 33 deletions fooltrader/connector/es_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@
logger = logging.getLogger(__name__)


def df_to_es(df, doc_type, index_name=None, timestamp_filed='timestamp', query=None, force=False):
# we make the data always have these fields:id,timestamp,securityId
# so we could handle append data to index in uniform way
def df_to_es(df, doc_type, index_name=None, timestamp_filed='timestamp', security_item=None, force=False):
if not index_name:
index_name = doc_type().meta.index

es_index_mapping(index_name, doc_type)

if not force:
query = None
if not force:
query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = security_item["id"]

start_date = es_get_latest_timestamp(index=index_name, query=query, time_field=timestamp_filed)
logger.info("{} latest timestamp:{}".format(index_name, start_date))
if start_date:
Expand Down Expand Up @@ -82,16 +91,9 @@ def kdata_to_es(security_type='stock', start_code=None, end_code=None, force=Fal
end_code=end_code).iterrows():
index_name = get_es_kdata_index(security_item['type'], security_item['exchange'])

query = None
if not force:
query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = security_item["id"]

df = get_kdata(security_item, generate_id=True)

df_to_es(df, doc_type=doc_type, index_name=index_name, query=query, force=force)
df_to_es(df, doc_type=doc_type, index_name=index_name, security_item=security_item, force=force)


def finance_sheet_to_es(sheet_type='balance_sheet', start_code=None, end_code=None, force=False):
Expand All @@ -105,13 +107,6 @@ def finance_sheet_to_es(sheet_type='balance_sheet', start_code=None, end_code=No
es_index_mapping(sheet_type, doc_type)

for _, security_item in get_security_list(start_code=start_code, end_code=end_code).iterrows():
query = None
if not force:
query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = security_item["id"]

if sheet_type == 'balance_sheet':
items = get_balance_sheet_items(security_item)
elif sheet_type == 'income_statement':
Expand All @@ -123,22 +118,15 @@ def finance_sheet_to_es(sheet_type='balance_sheet', start_code=None, end_code=No

df = index_df_with_time(df, index='reportPeriod')

df_to_es(df, doc_type=doc_type, timestamp_filed='reportPeriod', query=query, force=force)
df_to_es(df, doc_type=doc_type, timestamp_filed='reportPeriod', security_item=security_item, force=force)


def usa_stock_finance_to_es(force=False):
for _, security_item in get_security_list(security_type='stock', exchanges=['nasdaq'],
codes=US_STOCK_CODES).iterrows():
query = None
if not force:
query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = security_item["id"]

df = get_finance_summary_items(security_item)

df_to_es(df, doc_type=FinanceSummary, timestamp_filed='reportPeriod', query=query, force=force)
df_to_es(df, doc_type=FinanceSummary, timestamp_filed='reportPeriod', security_item=security_item, force=force)


def finance_event_to_es(event_type='finance_forecast', start_code=None, end_code=None, force=False):
Expand All @@ -148,19 +136,12 @@ def finance_event_to_es(event_type='finance_forecast', start_code=None, end_code
doc_type = FinanceReportEvent

for _, security_item in get_security_list(start_code=start_code, end_code=end_code).iterrows():
query = None
if not force:
query = {
"term": {"securityId": ""}
}
query["term"]["securityId"] = security_item["id"]

if event_type == 'finance_forecast':
df = get_finance_forecast_event(security_item)
elif event_type == 'finance_report':
df = get_finance_report_event(security_item)

df_to_es(df, doc_type=doc_type, query=query, force=force)
df_to_es(df, doc_type=doc_type, security_item=security_item, force=force)


if __name__ == '__main__':
Expand Down

0 comments on commit a74c4c5

Please sign in to comment.