Skip to content

Commit

Permalink
Merge pull request #3 from MarvinMiao/fix-ns-ha-bug
Browse files Browse the repository at this point in the history
fix(notification_service):
  • Loading branch information
gfork authored Jan 28, 2021
2 parents 49f759d + 27d9e32 commit 8d6abcb
Show file tree
Hide file tree
Showing 25 changed files with 1,341 additions and 219 deletions.
25 changes: 19 additions & 6 deletions flink-ai-flow/ai_flow/application_master/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
import os
from typing import Text
from ai_flow.rest_endpoint.service.server import AIFlowServer
from ai_flow.rest_endpoint.service.server import AIFlowServer, HighAvailableAIFlowServer
from ai_flow.store.db.base_model import base
from ai_flow.store.sqlalchemy_store import SqlAlchemyStore
from ai_flow.application_master.master_config import MasterConfig, DBType
Expand All @@ -36,7 +36,7 @@ class AIFlowMaster(object):
"""
AI flow master.
"""
def __init__(self, config_file: Text = None) -> None:
def __init__(self, config_file: Text = None, enable_ha=False, server_uri: str = None, ttl_ms=10000) -> None:
"""
Set the master attribute according to the master config file.
Expand All @@ -46,6 +46,9 @@ def __init__(self, config_file: Text = None) -> None:
self.config_file = config_file
self.server = None
self.master_config = MasterConfig()
self.enable_ha = enable_ha
self.server_uri = server_uri
self.ttl_ms = ttl_ms

def start(self,
is_block=False) -> None:
Expand All @@ -61,10 +64,20 @@ def start(self,
global GLOBAL_MASTER_CONFIG
GLOBAL_MASTER_CONFIG = self.master_config
logging.info("AI Flow Master Config {}".format(GLOBAL_MASTER_CONFIG))
self.server = AIFlowServer(store_uri=self.master_config.get_db_uri(),
port=str(self.master_config.get_master_port()),
start_default_notification=self.master_config.start_default_notification(),
notification_uri=self.master_config.get_notification_uri())
if not self.master_config.get_enable_ha():
self.server = AIFlowServer(
store_uri=self.master_config.get_db_uri(),
port=str(self.master_config.get_master_port()),
start_default_notification=self.master_config.start_default_notification(),
notification_uri=self.master_config.get_notification_uri())
else:
self.server = HighAvailableAIFlowServer(
store_uri=self.master_config.get_db_uri(),
port=str(self.master_config.get_master_port()),
start_default_notification=self.master_config.start_default_notification(),
notification_uri=self.master_config.get_notification_uri(),
server_uri=self.master_config.get_master_ip() + ":" + self.master_config.get_master_port(),
ttl_ms=self.master_config.get_ha_ttl_ms())
self.server.run(is_block=is_block)

def stop(self, clear_sql_lite_db_file=True) -> None:
Expand Down
29 changes: 28 additions & 1 deletion flink-ai-flow/ai_flow/application_master/master_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def get_sql_lite_db_file(self):
db_uri = self.get_db_uri()
return db_uri[10:]

def get_master_ip(self):
return self["master_ip"]

def get_master_port(self):
return self["master_port"]

Expand All @@ -49,13 +52,38 @@ def get_db_uri(self):
def get_db_type(self) -> DBType:
return DBType.value_of(self["db_type"])

def get_enable_ha(self):
if "enable_ha" in self:
raw = self.get("enable_ha")
assert str(raw).strip().lower() in {"true", "false"}
return bool(str(raw).strip().lower())
else:
return False

def get_ha_ttl_ms(self):
if "ha_ttl_ms" in self:
return int(self.get("ha_ttl_ms"))
else:
return 10000

def set_master_port(self, value):
self["master_port"] = value

def set_master_ip(self, value):
self["master_ip"] = value

def set_db_uri(self, db_type: DBType, uri: Text):
self["db_type"] = db_type.value
self["db_uri"] = uri

def set_enable_ha(self, enable_ha):
assert str(enable_ha).strip().lower() in {"true", "false"}
self["enable_ha"] = str(enable_ha)

def set_ha_ttl_ms(self, ha_ttl_ms):
assert int(str(ha_ttl_ms).strip()) >= 0
self["ha_ttl_ms"] = str(ha_ttl_ms).strip()

def start_default_notification(self)->bool:
if "start_default_notification" in self and self['start_default_notification'] is False:
return False
Expand All @@ -67,4 +95,3 @@ def set_notification_uri(self, notification_uri):

def get_notification_uri(self):
return self.get('notification_uri')

12 changes: 9 additions & 3 deletions flink-ai-flow/ai_flow/plugins/kubernetes_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self):
self.status_map = {}
self.running = True
self.lock = threading.Lock()
self.started = False

def listen(self):

Expand Down Expand Up @@ -130,9 +131,14 @@ def start_listen(self):
if not kubernetes_util.kubernetes_cluster_available:
logging.info("Kubernetes cluster is not available.")
return
self.setDaemon(daemonic=True)
self.setName(self.platform + "_listener")
self.start()
if not self.started:
self.setDaemon(daemonic=True)
self.setName(self.platform + "_listener")
self.start()
self.started = True
else:
logging.error("The LocalJobStatusListener can not be started twice! "
"Please check the code.")

def stop_listen(self):
if not kubernetes_util.kubernetes_cluster_available:
Expand Down
12 changes: 9 additions & 3 deletions flink-ai-flow/ai_flow/plugins/local_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, time_interval=2) -> None:
self.status_map = {}
self.running = True
self.lock = threading.Lock()
self.started = False

def listen(self):
while self.running:
Expand Down Expand Up @@ -126,9 +127,14 @@ def run(self):
traceback.print_exc()

def start_listen(self):
self.setDaemon(daemonic=True)
self.setName(self.platform + "_listener")
self.start()
if not self.started:
self.setDaemon(daemonic=True)
self.setName(self.platform + "_listener")
self.start()
self.started = True
else:
logging.error("The LocalJobStatusListener can not be started twice! "
"Please check the code.")

def stop_listen(self):
self.running = False
Expand Down
42 changes: 42 additions & 0 deletions flink-ai-flow/ai_flow/project/project_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,47 @@ def get_schedule_interval(self):
def set_schedule_interval(self, schedule_interval):
self["schedule_interval"] = schedule_interval

def get_enable_ha(self):
if "enable_ha" in self:
raw = self["enable_ha"]
assert str(raw).strip().lower() in {"true", "false"}
return bool(str(raw).strip().lower())
else:
return False

def set_enable_ha(self, enable_ha):
assert str(enable_ha).strip().lower() in {"true", "false"}
self["enable_ha"] = enable_ha

def _get_time_interval_ms(self, key, default):
if key in self:
raw = self[key]
assert int(str(raw).strip()) >= 0
return int(str(raw).strip())
else:
return default

def _set_time_interval_ms(self, key, value):
assert int(str(value).strip()) >= 0
self[key] = value

def get_list_member_interval_ms(self):
return self._get_time_interval_ms("list_member_interval_ms", 5000)

def set_list_member_interval_ms(self, list_member_interval_ms):
self._set_time_interval_ms("list_member_interval_ms", list_member_interval_ms)

def get_retry_interval_ms(self):
return self._get_time_interval_ms("retry_interval_ms", 1000)

def set_retry_interval_ms(self, retry_interval_ms):
self._set_time_interval_ms("retry_interval_ms", retry_interval_ms)

def get_retry_timeout_ms(self):
return self._get_time_interval_ms("retry_timeout_ms", 10000)

def set_retry_timeout_ms(self, retry_timeout_ms):
self._set_time_interval_ms("retry_timeout_ms", retry_timeout_ms)


_default_project_config = ProjectConfig()
Loading

0 comments on commit 8d6abcb

Please sign in to comment.