Skip to content

Commit

Permalink
Perform BaseTrigger.build_execution_event in a separate ProcessPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Szefler committed Dec 4, 2023
1 parent d7216a9 commit 112e68b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
12 changes: 11 additions & 1 deletion src/robusta/core/playbooks/base_trigger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from concurrent.futures.process import ProcessPoolExecutor
from typing import Dict, List, Optional, Type

from pydantic import BaseModel
Expand All @@ -20,6 +21,9 @@ def get_event_description(self) -> str:


class BaseTrigger(DocumentedModel):
# TODO what should the pool size be?
executor = ProcessPoolExecutor(max_workers=1)

def get_trigger_event(self) -> str:
pass

Expand All @@ -29,7 +33,13 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):
def build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
pass
return BaseTrigger.executor.submit(self._build_execution_event, event, sink_findings)

def _build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
# This is meant for running in a separate process
raise NotImplementedError

@staticmethod
def get_execution_event_type() -> Type[ExecutionBaseEvent]:
Expand Down
27 changes: 17 additions & 10 deletions src/robusta/core/triggers/helm_releases_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FAILED_STATUSES = ["failed", "unknown"]
DEPLOYED_STATUSES = ["deployed"]


class IncomingHelmReleasesEventPayload(BaseModel):
"""
The format of incoming payloads containing helm release events. This is mostly used for deserialization.
Expand All @@ -33,8 +34,10 @@ def get_event_name(self) -> str:
return HelmReleasesTriggerEvent.__name__

def get_event_description(self) -> str:
return f"HelmReleases-{self.helm_release.namespace}/{self.helm_release.name}/{self.helm_release.chart.metadata.version}-" \
f"{self.helm_release.info.status}"
return (
f"HelmReleases-{self.helm_release.namespace}/{self.helm_release.name}/{self.helm_release.chart.metadata.version}-"
f"{self.helm_release.info.status}"
)


@dataclass
Expand Down Expand Up @@ -93,7 +96,9 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):
if event.helm_release.info.status in UNHEALTHY_STATUSES:
rate_limiter_id = f"{event.helm_release.namespace}:{event.helm_release.name}"
else:
rate_limiter_id = f"{event.helm_release.namespace}:{event.helm_release.name}-{last_deployed_utc.isoformat()}"
rate_limiter_id = (
f"{event.helm_release.namespace}:{event.helm_release.name}-{last_deployed_utc.isoformat()}"
)
# if the server start time is greater than the last deployement time of the release then dont fire the trigger
# eg: start -> 5pm, last_deployed -> 6pm, delta -> - 1hr. => fire
# start -> 3pm, last_deployed -> 2pm, delta -> 1hr. => dont_fire
Expand All @@ -111,8 +116,8 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):

return can_fire

def build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]]
def _build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
if not isinstance(event, HelmReleasesTriggerEvent):
return
Expand All @@ -127,11 +132,13 @@ def get_execution_event_type() -> type:


class HelmReleaseUnhealthyTrigger(HelmReleaseBaseTrigger):
def __init__(self,
rate_limit: int,
names: List[str] = [],
namespace: str = None,
duration: int = 900, ):
def __init__(
self,
rate_limit: int,
names: List[str] = [],
namespace: str = None,
duration: int = 900,
):
super().__init__(
statuses=UNHEALTHY_STATUSES,
names=names,
Expand Down
2 changes: 1 addition & 1 deletion src/robusta/integrations/kubernetes/base_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __parse_kubernetes_objs(cls, k8s_payload: IncomingK8sEventPayload):
old_obj = cls.__load_hikaru_obj(k8s_payload.oldObj, model_class)
return obj, old_obj

def build_execution_event(
def _build_execution_event(
self, event: K8sTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
# we can't use self.get_execution_event_type() because for KubernetesAnyAllChangesTrigger we need to filter out
Expand Down
2 changes: 1 addition & 1 deletion src/robusta/integrations/prometheus/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):

return True

def build_execution_event(
def _build_execution_event(
self, event: PrometheusTriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
return AlertEventBuilder.build_event(event, sink_findings)
Expand Down

0 comments on commit 112e68b

Please sign in to comment.