diff --git a/src/robusta/integrations/mail/sender.py b/src/robusta/integrations/mail/sender.py index 5f01365f8..7ae711b0c 100644 --- a/src/robusta/integrations/mail/sender.py +++ b/src/robusta/integrations/mail/sender.py @@ -11,7 +11,6 @@ LinksBlock, LinkProp, MarkdownBlock, - ScanReportBlock, ) from robusta.core.reporting.consts import EnrichmentAnnotation, FindingSource from robusta.core.sinks.transformer import Transformer @@ -68,7 +67,9 @@ def send_finding_via_email(self, finding: Finding, platform_enabled: bool): if finding.source == FindingSource.PROMETHEUS: blocks.append(MarkdownBlock(f"{Emojis.Alert.value} *Alert:* {finding.description}")) elif finding.source == FindingSource.KUBERNETES_API_SERVER: - blocks.append(MarkdownBlock(f"{Emojis.K8Notification.value} *K8s event detected:* {finding.description}")) + blocks.append( + MarkdownBlock(f"{Emojis.K8Notification.value} *K8s event detected:* {finding.description}") + ) else: blocks.append(MarkdownBlock(f"{Emojis.K8Notification.value} *Notification:* {finding.description}")) diff --git a/src/robusta/model/config.py b/src/robusta/model/config.py index 6b4338ad7..62710ff4f 100644 --- a/src/robusta/model/config.py +++ b/src/robusta/model/config.py @@ -51,32 +51,27 @@ def construct_new_sinks( new_sinks = existing_sinks.copy() # create new sinks, or update existing if changed for sink_config in new_sinks_config: - try: - # temporary workaround to skip the default and unconfigured robusta token - if ( - isinstance(sink_config, RobustaSinkConfigWrapper) - and sink_config.robusta_sink.token == "" - ): - continue - if sink_config.get_name() not in new_sinks.keys(): - logging.info(f"Adding {type(sink_config)} sink named {sink_config.get_name()}") - new_sinks[sink_config.get_name()] = SinkFactory.create_sink(sink_config, registry) - elif ( - sink_config.get_params() != new_sinks[sink_config.get_name()].params - or new_sinks[sink_config.get_name()].is_global_config_changed() - ): - config_change_msg = ( - "due to global config change" - if new_sinks[sink_config.get_name()].is_global_config_changed() - else "due to param change" - ) - logging.info( - f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}" - ) - new_sinks[sink_config.get_name()].stop() - new_sinks[sink_config.get_name()] = SinkFactory.create_sink(sink_config, registry) - except Exception as e: - logging.error(f"Error configuring sink {sink_config.get_name()} of type {type(sink_config)}: {e}") + # temporary workaround to skip the default and unconfigured robusta token + if ( + isinstance(sink_config, RobustaSinkConfigWrapper) + and sink_config.robusta_sink.token == "" + ): + continue + if sink_config.get_name() not in new_sinks.keys(): + logging.info(f"Adding {type(sink_config)} sink named {sink_config.get_name()}") + new_sinks[sink_config.get_name()] = SinkFactory.create_sink(sink_config, registry) + elif ( + sink_config.get_params() != new_sinks[sink_config.get_name()].params + or new_sinks[sink_config.get_name()].is_global_config_changed() + ): + config_change_msg = ( + "due to global config change" + if new_sinks[sink_config.get_name()].is_global_config_changed() + else "due to param change" + ) + logging.info(f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}") + new_sinks[sink_config.get_name()].stop() + new_sinks[sink_config.get_name()] = SinkFactory.create_sink(sink_config, registry) return new_sinks diff --git a/src/robusta/runner/config_loader.py b/src/robusta/runner/config_loader.py index 619c6386c..5b0cfa7f3 100644 --- a/src/robusta/runner/config_loader.py +++ b/src/robusta/runner/config_loader.py @@ -3,6 +3,7 @@ import logging import os import pkgutil +import signal import subprocess import sys import threading @@ -40,7 +41,6 @@ class ConfigLoader: - # the structure on disk is: # root_playbook_path/ # |- playbook_dir1 @@ -147,7 +147,7 @@ def __import_playbooks_package(cls, actions_registry: ActionsRegistry, package_n # Reload is required for modules that are already loaded m = importlib.reload(importlib.import_module(module_name)) playbook_actions = getmembers(m, Action.is_action) - for (action_name, action_func) in playbook_actions: + for action_name, action_func in playbook_actions: actions_registry.add_action(action_func) except Exception: logging.error(f"failed to module {playbooks_module}", exc_info=True) @@ -226,9 +226,12 @@ def __reload_playbook_packages(self, change_name): except Exception: logging.error( - "unknown error reloading playbooks. will try again when they next change", + "Error (re)loading playbooks/related resources, exiting.", exc_info=True, ) + # Kill the whole process group (which means this process and all of its descendant + # processes). The rest of the runner shutdown happens in robusta.runner.process_setup. + os.killpg(os.getpgid(0), signal.SIGTERM) @classmethod def __prepare_runtime_config( diff --git a/src/robusta/runner/main.py b/src/robusta/runner/main.py index 4c53698b9..674d94ccf 100644 --- a/src/robusta/runner/main.py +++ b/src/robusta/runner/main.py @@ -12,6 +12,7 @@ from robusta.patch.patch import create_monkey_patches from robusta.runner.config_loader import ConfigLoader from robusta.runner.log_init import init_logging, logging +from robusta.runner.process_setup import process_setup from robusta.runner.ssl_utils import add_custom_certificate from robusta.runner.telemetry_service import TelemetryLevel, TelemetryService from robusta.runner.web import Web @@ -19,6 +20,7 @@ def main(): + process_setup() init_logging() ServerStart.set() if add_custom_certificate(ADDITIONAL_CERTIFICATE): diff --git a/src/robusta/runner/process_setup.py b/src/robusta/runner/process_setup.py new file mode 100644 index 000000000..a5ff4d404 --- /dev/null +++ b/src/robusta/runner/process_setup.py @@ -0,0 +1,18 @@ +import os +import sys + + +def process_setup(): + if os.fork(): + # Parent process, pid 1 in our deployment scenario. Wait (blocking - doesn't + # utilitze any CPU) for the forked "main" process to exit (if it ever does) + os.wait() + # At this point we are sure no subprocesses are running, so we can safely + # exit the pid 1 process, effectively causing the Docker image (and thus + # the k8s pod) to terminate. + sys.exit(1) + + # Child process; create a process group to conveniently terminate the process + # along with subprocesses if need be via killpg. Currently the only use is in + # robusta.runner.config_loader.ConfigLoader.__reload_playbook_packages. + os.setpgrp()