Skip to content

Commit

Permalink
Set cluster inactive on SIGINT and active on start (#1100)
Browse files Browse the repository at this point in the history
* Set cluster inactive on SIGINT and active on start

* Make set_cluster_active available for all sinks

* Add logging on setting cluster active

* Close websocket on SIGINT

* set_cluster_active change updated_at
  • Loading branch information
LeaveMyYard authored Oct 20, 2023
1 parent c3d2c8a commit 4c6f0f9
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 6 deletions.
13 changes: 13 additions & 0 deletions src/robusta/core/playbooks/playbooks_event_handler_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from robusta.core.reporting import MarkdownBlock
from robusta.core.reporting.base import Finding
from robusta.core.reporting.consts import SYNC_RESPONSE_SINK
from robusta.core.sinks.robusta import RobustaSink
from robusta.core.sinks.robusta.dal.model_conversion import ModelConversion
from robusta.model.alert_relabel_config import AlertRelabel
from robusta.model.config import Registry
Expand Down Expand Up @@ -334,8 +335,20 @@ def is_healthy(
return True
return all(sink.is_healthy() for sink in sinks_registry.get_all().values())

def set_cluster_active(self, active: bool):
logging.info(f"Setting cluster active to {active}")
for sink in self.registry.get_sinks().get_all().values():
sink.set_cluster_active(active)

def handle_sigint(self, sig, frame):
logging.info("SIGINT handler called")

if not self.is_healthy(): # dump stuck trace only when the runner is unhealthy
StackTracer.dump()

receiver = self.registry.get_receiver()
if receiver is not None:
receiver.stop()

self.set_cluster_active(False)
sys.exit(0)
13 changes: 13 additions & 0 deletions src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,16 @@ def __update_token_patch(self, event, session):
logging.debug(f"Event {event}, Session {session}")
if session and event == "TOKEN_REFRESHED":
self.client.postgrest.auth(session.access_token)

def set_cluster_active(self, active: bool) -> None:
try:
(
self.client.table(CLUSTERS_STATUS_TABLE)
.update({"active": active, "updated_at": "now()"})
.eq("cluster_id", self.cluster)
.eq("account_id", self.account_id)
.execute()
)
except Exception as e:
logging.error(f"Failed to set cluster status active=False error: {e}")
self.handle_supabase_error()
18 changes: 13 additions & 5 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.__thread.start()
self.__watchdog_thread.start()

def set_cluster_active(self, active: bool):
self.dal.set_cluster_active(active)

def __init_service_resolver(self):
"""
Init service resolver from the service stored in storage.
Expand Down Expand Up @@ -342,7 +345,9 @@ def __to_node_info(cls, node: Union[V1Node, Node]) -> Dict:
return node_info

@classmethod
def __from_api_server_node(cls, api_server_node: Union[V1Node, Node], pod_requests_list: List[PodResources]) -> NodeInfo:
def __from_api_server_node(
cls, api_server_node: Union[V1Node, Node], pod_requests_list: List[PodResources]
) -> NodeInfo:
addresses = api_server_node.status.addresses or []
external_addresses = [address for address in addresses if "externalip" in address.type.lower()]
external_ip = ",".join([addr.address for addr in external_addresses])
Expand All @@ -353,8 +358,12 @@ def __from_api_server_node(cls, api_server_node: Union[V1Node, Node], pod_reques
capacity = api_server_node.status.capacity or {}
allocatable = api_server_node.status.allocatable or {}
# V1Node and Node use snake case and camelCase respectively, handle this for more than 1 word attributes.
creation_ts = getattr(api_server_node.metadata, "creation_timestamp", None) or getattr(api_server_node.metadata, "creationTimestamp", None)
version = getattr(api_server_node.metadata, "resource_version", None) or getattr(api_server_node.metadata, "resourceVersion", None)
creation_ts = getattr(api_server_node.metadata, "creation_timestamp", None) or getattr(
api_server_node.metadata, "creationTimestamp", None
)
version = getattr(api_server_node.metadata, "resource_version", None) or getattr(
api_server_node.metadata, "resourceVersion", None
)
return NodeInfo(
name=api_server_node.metadata.name,
node_creation_time=str(creation_ts),
Expand All @@ -371,7 +380,7 @@ def __from_api_server_node(cls, api_server_node: Union[V1Node, Node], pod_reques
pods_count=len(pod_requests_list),
pods=",".join([pod_req.pod_name for pod_req in pod_requests_list]),
node_info=cls.__to_node_info(api_server_node),
resource_version=int(version) if version else 0
resource_version=int(version) if version else 0,
)

def __publish_new_nodes(self, current_nodes: V1NodeList, node_requests: Dict[str, List[PodResources]]):
Expand Down Expand Up @@ -612,7 +621,6 @@ def __update_node(self, new_node: Node, operation: K8sOperationType):
self.__discovery_metrics.on_nodes_updated(1)

def __update_job(self, new_job: Job, operation: K8sOperationType):

new_info = JobInfo.from_api_server(new_job, [])
job_key = new_info.get_service_key()
with self.services_publish_lock:
Expand Down
3 changes: 3 additions & 0 deletions src/robusta/core/sinks/sink_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ def is_healthy(self) -> bool:

def handle_service_diff(self, new_obj: Any, operation: K8sOperationType):
pass

def set_cluster_active(self, active: bool):
pass
1 change: 0 additions & 1 deletion src/robusta/model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def construct_new_sinks(
existing_sinks: Dict[str, SinkBase],
registry,
) -> Dict[str, SinkBase]:

new_sink_names = [sink_config.get_name() for sink_config in new_sinks_config]
# remove deleted sinks
deleted_sink_names = [sink_name for sink_name in existing_sinks.keys() if sink_name not in new_sink_names]
Expand Down
2 changes: 2 additions & 0 deletions src/robusta/runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def main():
logging.info(f"Running alerts workers pool of {ALERT_BUILDER_WORKERS}")

Web.init(event_handler, loader)

signal.signal(signal.SIGINT, event_handler.handle_sigint)
event_handler.set_cluster_active(True)
Web.run() # blocking
loader.close()

Expand Down

0 comments on commit 4c6f0f9

Please sign in to comment.