Skip to content

Commit

Permalink
Perform BaseTrigger.build_execution_event in a separate ProcessPool (#…
Browse files Browse the repository at this point in the history
…1197)

* Perform BaseTrigger.build_execution_event in a separate ProcessPool

* fix pydantic/_thread.lock interaction errors

* ProcessPoolExecutor init simplification

* add missing result()

* monkey patching

* don't reapply monkey patches if already applied; larger execution event process pool

* create all monkey patches on discovery
remove separate process pool for alerts
take worker num from env var

---------

Co-authored-by: Arik Alon <alon.arik@gmail.com>
  • Loading branch information
Robert Szefler and arikalon1 authored Dec 6, 2023
1 parent 465f140 commit b9d9042
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 38 deletions.
1 change: 0 additions & 1 deletion src/robusta/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
VideoEnricherParams,
)
from robusta.core.model.env_vars import (
ALERT_BUILDER_WORKERS,
CLUSTER_DOMAIN,
CLUSTER_STATUS_PERIOD_SEC,
CUSTOM_PLAYBOOKS_ROOT,
Expand Down
6 changes: 3 additions & 3 deletions src/robusta/core/discovery/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from robusta.core.model.jobs import JobInfo
from robusta.core.model.namespaces import NamespaceInfo
from robusta.core.model.services import ContainerInfo, ServiceConfig, ServiceInfo, VolumeInfo
from robusta.patch.patch import patch_on_pod_conditions
from robusta.patch.patch import create_monkey_patches
from robusta.utils.cluster_provider_discovery import cluster_provider
from robusta.utils.stack_tracer import StackTracer

Expand Down Expand Up @@ -136,7 +136,7 @@ def create_service_info(obj: Union[Deployment, DaemonSet, StatefulSet, Pod, Repl

@staticmethod
def discovery_process() -> DiscoveryResults:
patch_on_pod_conditions()
create_monkey_patches()
Discovery.stacktrace_thread_active = True
threading.Thread(target=Discovery.stack_dump_on_signal).start()
pods_metadata: List[V1ObjectMeta] = []
Expand Down Expand Up @@ -349,7 +349,7 @@ def discovery_process() -> DiscoveryResults:
continue_ref: Optional[str] = None
for _ in range(DISCOVERY_MAX_BATCHES):
secrets = client.CoreV1Api().list_secret_for_all_namespaces(
label_selector=f"owner=helm", _continue=continue_ref
label_selector="owner=helm", _continue=continue_ref
)
if not secrets.items:
break
Expand Down
3 changes: 1 addition & 2 deletions src/robusta/core/model/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ def load_bool(env_var, default: bool):
DEFAULT_TIMEZONE = pytz.timezone(os.environ.get("DEFAULT_TIMEZONE", "UTC"))
NUM_EVENT_THREADS = int(os.environ.get("NUM_EVENT_THREADS", 20))
INCOMING_EVENTS_QUEUE_MAX_SIZE = int(os.environ.get("INCOMING_EVENTS_QUEUE_MAX_SIZE", 500))
ALERT_BUILDER_WORKERS = int(os.environ.get("ALERT_BUILDER_WORKERS", 5))
ALERTS_WORKERS_POOL = load_bool("ALERTS_WORKERS_POOL", False)
EVENT_PARSING_WORKERS = int(os.environ.get("EVENT_PARSING_WORKERS", 1))

FLOAT_PRECISION_LIMIT = int(os.environ.get("FLOAT_PRECISION_LIMIT", 11))

Expand Down
20 changes: 19 additions & 1 deletion src/robusta/core/playbooks/base_trigger.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import abc
from concurrent.futures.process import ProcessPoolExecutor
from typing import Dict, List, Optional, Type

from pydantic import BaseModel

from robusta.core.model.env_vars import EVENT_PARSING_WORKERS
from robusta.core.model.events import ExecutionBaseEvent
from robusta.core.reporting.base import Finding
from robusta.patch.patch import create_monkey_patches
from robusta.utils.documented_pydantic import DocumentedModel


Expand All @@ -19,6 +22,9 @@ def get_event_description(self) -> str:
return "NA"


build_execution_event_process_pool = ProcessPoolExecutor(max_workers=EVENT_PARSING_WORKERS)


class BaseTrigger(DocumentedModel):
def get_trigger_event(self) -> str:
pass
Expand All @@ -29,7 +35,19 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):
def build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
pass
return build_execution_event_process_pool.submit(
self._build_execution_event_trampoline, event, sink_findings
).result()

def _build_execution_event_trampoline(self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]):
create_monkey_patches()
return self._build_execution_event(event, sink_findings)

def _build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
# This is meant for running in a separate process
raise NotImplementedError

@staticmethod
def get_execution_event_type() -> Type[ExecutionBaseEvent]:
Expand Down
2 changes: 1 addition & 1 deletion src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def publish_cluster_nodes(self, node_count: int, pod_count: int):
logging.error(f"Failed to publish node count {data} error: {e}")
self.handle_supabase_error()

logging.info(f"cluster nodes: {UPDATE_CLUSTER_NODE_COUNT} => {data}")
logging.debug(f"cluster nodes: {UPDATE_CLUSTER_NODE_COUNT} => {data}")

def persist_events_block(self, block: EventsBlock):
db_events = []
Expand Down
27 changes: 17 additions & 10 deletions src/robusta/core/triggers/helm_releases_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FAILED_STATUSES = ["failed", "unknown"]
DEPLOYED_STATUSES = ["deployed"]


class IncomingHelmReleasesEventPayload(BaseModel):
"""
The format of incoming payloads containing helm release events. This is mostly used for deserialization.
Expand All @@ -33,8 +34,10 @@ def get_event_name(self) -> str:
return HelmReleasesTriggerEvent.__name__

def get_event_description(self) -> str:
return f"HelmReleases-{self.helm_release.namespace}/{self.helm_release.name}/{self.helm_release.chart.metadata.version}-" \
f"{self.helm_release.info.status}"
return (
f"HelmReleases-{self.helm_release.namespace}/{self.helm_release.name}/{self.helm_release.chart.metadata.version}-"
f"{self.helm_release.info.status}"
)


@dataclass
Expand Down Expand Up @@ -93,7 +96,9 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):
if event.helm_release.info.status in UNHEALTHY_STATUSES:
rate_limiter_id = f"{event.helm_release.namespace}:{event.helm_release.name}"
else:
rate_limiter_id = f"{event.helm_release.namespace}:{event.helm_release.name}-{last_deployed_utc.isoformat()}"
rate_limiter_id = (
f"{event.helm_release.namespace}:{event.helm_release.name}-{last_deployed_utc.isoformat()}"
)
# if the server start time is greater than the last deployement time of the release then dont fire the trigger
# eg: start -> 5pm, last_deployed -> 6pm, delta -> - 1hr. => fire
# start -> 3pm, last_deployed -> 2pm, delta -> 1hr. => dont_fire
Expand All @@ -111,8 +116,8 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):

return can_fire

def build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]]
def _build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
if not isinstance(event, HelmReleasesTriggerEvent):
return
Expand All @@ -127,11 +132,13 @@ def get_execution_event_type() -> type:


class HelmReleaseUnhealthyTrigger(HelmReleaseBaseTrigger):
def __init__(self,
rate_limit: int,
names: List[str] = [],
namespace: str = None,
duration: int = 900, ):
def __init__(
self,
rate_limit: int,
names: List[str] = [],
namespace: str = None,
duration: int = 900,
):
super().__init__(
statuses=UNHEALTHY_STATUSES,
names=names,
Expand Down
2 changes: 1 addition & 1 deletion src/robusta/integrations/kubernetes/base_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __parse_kubernetes_objs(cls, k8s_payload: IncomingK8sEventPayload):
old_obj = cls.__load_hikaru_obj(k8s_payload.oldObj, model_class)
return obj, old_obj

def build_execution_event(
def _build_execution_event(
self, event: K8sTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
# we can't use self.get_execution_event_type() because for KubernetesAnyAllChangesTrigger we need to filter out
Expand Down
12 changes: 2 additions & 10 deletions src/robusta/integrations/prometheus/trigger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import logging
from concurrent.futures.process import ProcessPoolExecutor
from typing import Dict, List, NamedTuple, Optional, Type, Union

from hikaru.model.rel_1_26 import DaemonSet, HorizontalPodAutoscaler, Job, Node, NodeList, StatefulSet
from pydantic.main import BaseModel

from robusta.core.model.env_vars import ALERT_BUILDER_WORKERS, ALERTS_WORKERS_POOL
from robusta.core.model.events import ExecutionBaseEvent
from robusta.core.playbooks.base_trigger import BaseTrigger, TriggerEvent
from robusta.core.reporting.base import Finding
Expand Down Expand Up @@ -93,7 +91,7 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):

return True

def build_execution_event(
def _build_execution_event(
self, event: PrometheusTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
return AlertEventBuilder.build_event(event, sink_findings)
Expand All @@ -108,8 +106,6 @@ class PrometheusAlertTriggers(BaseModel):


class AlertEventBuilder:
executor = ProcessPoolExecutor(max_workers=ALERT_BUILDER_WORKERS)

@classmethod
def __find_node_by_ip(cls, ip) -> Optional[Node]:
nodes: NodeList = NodeList.listNode().obj
Expand Down Expand Up @@ -184,8 +180,4 @@ def _build_event_task(
def build_event(
event: PrometheusTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
if ALERTS_WORKERS_POOL:
future = AlertEventBuilder.executor.submit(AlertEventBuilder._build_event_task, event, sink_findings)
return future.result()
else:
return AlertEventBuilder._build_event_task(event, sink_findings)
return AlertEventBuilder._build_event_task(event, sink_findings)
6 changes: 3 additions & 3 deletions src/robusta/integrations/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def validate_value(cls, v: str) -> dict:
# Slack value is sent as a stringified json, so we need to parse it before validation
return json.loads(v)


class SlackActionsMessage(BaseModel):
actions: List[SlackActionRequest]

Expand Down Expand Up @@ -168,7 +169,6 @@ def _process_action_sync(self, action: ExternalActionRequest, validate_timestamp
def _parse_websocket_message(
message: Union[str, bytes, bytearray]
) -> Union[SlackActionsMessage, ExternalActionRequest]:

try:
return SlackActionsMessage.parse_raw(message) # this is slack callback format
except ValidationError:
Expand Down Expand Up @@ -276,7 +276,7 @@ def validate_light_action(self, action_request: ExternalActionRequest) -> Valida

@staticmethod
def validate_action_request_signature(
action_request: ExternalActionRequest, signing_key: str
action_request: ExternalActionRequest, signing_key: str
) -> ValidationResponse:
generated_signature = sign_action_request(action_request.body, signing_key)
if hmac.compare_digest(generated_signature, action_request.signature):
Expand Down Expand Up @@ -330,7 +330,7 @@ def validate_with_private_key(self, action_request: ExternalActionRequest, signi

@classmethod
def __extract_key_and_validate(
cls, encrypted: str, private_key: RSAPrivateKey, body: ActionRequestBody
cls, encrypted: str, private_key: RSAPrivateKey, body: ActionRequestBody
) -> (bool, Optional[UUID]):
try:
plain = private_key.decrypt(
Expand Down
8 changes: 7 additions & 1 deletion src/robusta/patch/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ def get_origin(tp):

NoneType = type(None)

monkey_patches_applied = False


def create_monkey_patches():
global monkey_patches_applied
if monkey_patches_applied:
return
# The 2 patched Hikaru methods are very expensive CPU wise. We patched them, and using cached attributes
# on the hikaru class, so that we perform the expensive procedure only once
logging.info("Creating hikaru monkey patches")
Expand All @@ -39,13 +44,14 @@ def create_monkey_patches():
logging.info("Creating kubernetes ContainerImage monkey patch")
EventsV1Event.event_time = EventsV1Event.event_time.setter(event_time)
patch_on_pod_conditions()
monkey_patches_applied = True


def patch_on_pod_conditions():
# This fixes https://github.com/kubernetes-client/python/issues/2056 before the
# k8s people take care of it (it's urgent for us).

logging.info("Creating kubernetes PodFailurePolicyRUle.on_pod_conditions monkey patch")
logging.debug("Creating kubernetes PodFailurePolicyRUle.on_pod_conditions monkey patch")

def patched_setter(self, on_pod_conditions):
self._on_pod_conditions = on_pod_conditions
Expand Down
5 changes: 0 additions & 5 deletions src/robusta/runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from robusta.core.model.env_vars import (
ADDITIONAL_CERTIFICATE,
ALERT_BUILDER_WORKERS,
ALERTS_WORKERS_POOL,
ENABLE_TELEMETRY,
ROBUSTA_TELEMETRY_ENDPOINT,
SEND_ADDITIONAL_TELEMETRY,
Expand Down Expand Up @@ -41,9 +39,6 @@ def main():
else:
logging.info("Telemetry is disabled.")

if ALERTS_WORKERS_POOL:
logging.info(f"Running alerts workers pool of {ALERT_BUILDER_WORKERS}")

Web.init(event_handler, loader)

signal.signal(signal.SIGINT, event_handler.handle_sigint)
Expand Down

0 comments on commit b9d9042

Please sign in to comment.