Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi resource trigger #1212

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/playbook-reference/triggers/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,57 @@ Low-level Triggers
Low-level triggers fire on the raw creation, deletion, and modification of resources in your cluster. They can be noisy
compared to other triggers, as they fire on even the smallest change to a resource.

Multi-Resource Triggers
-------------------------

.. _on_kubernetes_resource_operation:

.. details:: on_kubernetes_resource_operation

``on_kubernetes_resource_operation`` fires when one of the specified resources, had one of the specified operations.

* ``operations``: List of `operations`. If empty, all `operations` are included. Options:
* ``create``
* ``update``
* ``delete``
* ``resources``: List of Kubernetes `resources`. If empty, all `resources` are included. Options:
* ``deployment``
* ``pod``
* ``job``
* ``node``
* ``replicaset``
* ``statefulset``
* ``daemonset``
* ``ingress``
* ``service``
* ``event``
* ``horizontalpodautoscaler``
* ``clusterrole``
* ``clusterrolebinding``
* ``namespace``
* ``serviceaccount``
* ``persistentvolume``
* ``configmap``

Example playbook:

.. code-block:: yaml

customPlaybooks:
- triggers:
- on_kubernetes_resource_operation:
resources: ["deployment"]
operations: ["update"]
actions:
- create_finding:
title: "Deployment $name on namespace $namespace updated"
aggregation_key: "Deployment Update"



Single Resource Triggers
-------------------------

.. jinja::
:inline-ctx: { "resource_name" : "Pod", "related_actions" : ["Pod Enrichers (General)", "pod_events_enricher"] }
:header_update_levels:
Expand Down
14 changes: 4 additions & 10 deletions helm/robusta/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,8 @@ platformPlaybooks:
sinks:
- "robusta_ui_sink"
- triggers:
- on_deployment_all_changes: {}
- on_daemonset_all_changes: {}
- on_statefulset_all_changes: {}
- on_kubernetes_resource_operation:
resources: ["deployment", "daemonset", "statefulset"]
actions:
- resource_babysitter: {}
sinks:
Expand All @@ -400,13 +399,8 @@ platformPlaybooks:
sinks:
- "robusta_ui_sink"
- triggers:
- on_deployment_all_changes: {}
- on_daemonset_all_changes: {}
- on_statefulset_all_changes: {}
- on_replicaset_all_changes: {}
- on_pod_all_changes: {}
- on_node_all_changes: {}
- on_job_all_changes: {}
- on_kubernetes_resource_operation:
resources: ["deployment", "replicaset", "daemonset", "statefulset", "pod", "node", "job" ]
actions:
- resource_events_diff: {}
- triggers:
Expand Down
3 changes: 2 additions & 1 deletion src/robusta/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import certifi
import typer
import yaml
from hikaru.model.rel_1_26 import Container, Job, JobSpec, ObjectMeta, PodSpec, PodTemplateSpec
from hikaru.model.rel_1_26 import Container, Job, JobSpec, ObjectMeta, PodSpec, PodTemplateSpec, SecurityContext
from kubernetes import client, config
from pydantic import BaseModel, Extra

Expand Down Expand Up @@ -501,6 +501,7 @@ def demo_alert(
name="alert-curl",
image=image,
command=command,
securityContext=SecurityContext(runAsUser=2000),
)
],
restartPolicy="Never",
Expand Down
7 changes: 7 additions & 0 deletions src/robusta/core/discovery/top_service_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ def add_cached_resource(cls, resource: TopLevelResource):
cls.__recent_resource_updates[resource.get_resource_key()] = CachedResourceInfo(
resource=resource, event_time=time.time()
)

@classmethod
arikalon1 marked this conversation as resolved.
Show resolved Hide resolved
def remove_cached_resource(cls, resource: TopLevelResource):
arikalon1 marked this conversation as resolved.
Show resolved Hide resolved
if resource in cls.__namespace_to_resource[resource.namespace]:
cls.__namespace_to_resource[resource.namespace].remove(resource)
with cls.__cached_updates_lock:
cls.__recent_resource_updates.pop(resource.get_resource_key(), None)
1 change: 0 additions & 1 deletion src/robusta/core/model/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def load_bool(env_var, default: bool):
DEFAULT_TIMEZONE = pytz.timezone(os.environ.get("DEFAULT_TIMEZONE", "UTC"))
NUM_EVENT_THREADS = int(os.environ.get("NUM_EVENT_THREADS", 20))
INCOMING_EVENTS_QUEUE_MAX_SIZE = int(os.environ.get("INCOMING_EVENTS_QUEUE_MAX_SIZE", 500))
EVENT_PARSING_WORKERS = int(os.environ.get("EVENT_PARSING_WORKERS", 1))

FLOAT_PRECISION_LIMIT = int(os.environ.get("FLOAT_PRECISION_LIMIT", 11))

Expand Down
26 changes: 4 additions & 22 deletions src/robusta/core/playbooks/base_trigger.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import abc
from concurrent.futures.process import ProcessPoolExecutor
from typing import Dict, List, Optional, Type
from typing import Any, Dict, List, Optional, Type

from pydantic import BaseModel

from robusta.core.model.env_vars import EVENT_PARSING_WORKERS
from robusta.core.model.events import ExecutionBaseEvent
from robusta.core.reporting.base import Finding
from robusta.patch.patch import create_monkey_patches
from robusta.utils.documented_pydantic import DocumentedModel


Expand All @@ -22,32 +19,17 @@ def get_event_description(self) -> str:
return "NA"


build_execution_event_process_pool = ProcessPoolExecutor(max_workers=EVENT_PARSING_WORKERS)


class BaseTrigger(DocumentedModel):
def get_trigger_event(self) -> str:
pass

def should_fire(self, event: TriggerEvent, playbook_id: str):
def should_fire(self, event: TriggerEvent, playbook_id: str, build_context: Dict[str, Any]):
arikalon1 marked this conversation as resolved.
Show resolved Hide resolved
return True

def build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
) -> Optional[ExecutionBaseEvent]:
return build_execution_event_process_pool.submit(
self._build_execution_event_trampoline, event, sink_findings
).result()

def _build_execution_event_trampoline(self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]):
create_monkey_patches()
return self._build_execution_event(event, sink_findings)

def _build_execution_event(
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]]
self, event: TriggerEvent, sink_findings: Dict[str, List[Finding]], build_context: Dict[str, Any]
) -> Optional[ExecutionBaseEvent]:
# This is meant for running in a separate process
raise NotImplementedError
pass

@staticmethod
def get_execution_event_type() -> Type[ExecutionBaseEvent]:
Expand Down
17 changes: 9 additions & 8 deletions src/robusta/core/playbooks/internal/discovery_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
@action
def cluster_discovery_updates(event: KubernetesAnyChangeEvent):
arikalon1 marked this conversation as resolved.
Show resolved Hide resolved
if (
event.operation in [K8sOperationType.CREATE, K8sOperationType.UPDATE]
and event.obj.kind in ["Deployment", "ReplicaSet", "DaemonSet", "StatefulSet", "Pod", "Job"]
event.obj.kind in ["Deployment", "ReplicaSet", "DaemonSet", "StatefulSet", "Pod", "Job"]
and not event.obj.metadata.ownerReferences
):
TopServiceResolver.add_cached_resource(
TopLevelResource(
name=event.obj.metadata.name,
resource_type=event.obj.kind,
namespace=event.obj.metadata.namespace,
)
resource = TopLevelResource(
name=event.obj.metadata.name,
resource_type=event.obj.kind,
namespace=event.obj.metadata.namespace,
)
if event.operation in [K8sOperationType.CREATE, K8sOperationType.UPDATE]:
TopServiceResolver.add_cached_resource(resource)
elif event.operation == K8sOperationType.DELETE:
TopServiceResolver.remove_cached_resource(resource)


@action
Expand Down
14 changes: 10 additions & 4 deletions src/robusta/core/playbooks/playbooks_event_handler_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from robusta.core.reporting import MarkdownBlock
from robusta.core.reporting.base import Finding
from robusta.core.reporting.consts import SYNC_RESPONSE_SINK
from robusta.core.sinks.robusta import RobustaSink
from robusta.core.sinks.robusta.dal.model_conversion import ModelConversion
from robusta.model.alert_relabel_config import AlertRelabel
from robusta.model.config import Registry
Expand Down Expand Up @@ -46,12 +45,13 @@ def handle_trigger(self, trigger_event: TriggerEvent) -> Optional[Dict[str, Any]
execution_response = None
execution_event: Optional[ExecutionBaseEvent] = None
sink_findings: Dict[str, List[Finding]] = defaultdict(list)
build_context: Dict[str, Any] = {}
for playbook in playbooks:
fired_trigger = self.__get_fired_trigger(trigger_event, playbook.triggers, playbook.get_id())
fired_trigger = self.__get_fired_trigger(trigger_event, playbook.triggers, playbook.get_id(), build_context)
if fired_trigger:
execution_event = None
try:
execution_event = fired_trigger.build_execution_event(trigger_event, sink_findings)
execution_event = fired_trigger.build_execution_event(trigger_event, sink_findings, build_context)
# sink_findings needs to be shared between playbooks.
# build_execution_event returns a different instance because it's running in a child process
execution_event.sink_findings = sink_findings
Expand Down Expand Up @@ -281,9 +281,10 @@ def __get_fired_trigger(
trigger_event: TriggerEvent,
playbook_triggers: List[Trigger],
playbook_id: str,
build_context: Dict[str, Any],
) -> Optional[BaseTrigger]:
for trigger in playbook_triggers:
if trigger.get().should_fire(trigger_event, playbook_id):
if trigger.get().should_fire(trigger_event, playbook_id, build_context):
return trigger.get()
return None

Expand All @@ -307,6 +308,11 @@ def __handle_findings(self, execution_event: ExecutionBaseEvent):
# Each sink has a different findings, but enrichments are shared
finding_copy = copy.deepcopy(finding)
sink.write_finding(finding_copy, self.registry.get_sinks().platform_enabled)

sink_info = sinks_info[sink_name]
sink_info.type = sink.__class__.__name__
sink_info.findings_count += 1

if sink.params.stop:
return

Expand Down
2 changes: 2 additions & 0 deletions src/robusta/core/triggers/custom_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
WarningEventUpdateTrigger,
)
from robusta.core.triggers.job_failed_trigger import JobFailedTrigger
from robusta.core.triggers.multi_resoources_trigger import MultiResourceTrigger
from robusta.core.triggers.pod_crash_loop_trigger import PodCrashLoopTrigger
from robusta.core.triggers.pod_image_pull_backoff import PodImagePullBackoffTrigger
from robusta.core.triggers.pod_oom_killed_trigger import PodOOMKilledTrigger
Expand All @@ -25,3 +26,4 @@ class CustomTriggers(BaseModel):
on_job_failure: Optional[JobFailedTrigger]
on_pod_oom_killed: Optional[PodOOMKilledTrigger]
on_container_oom_killed: Optional[ContainerOOMKilledTrigger]
on_kubernetes_resource_operation: Optional[MultiResourceTrigger]
14 changes: 7 additions & 7 deletions src/robusta/core/triggers/error_event_trigger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Any, Dict, List

from robusta.core.discovery.top_service_resolver import TopServiceResolver
from robusta.core.playbooks.base_trigger import TriggerEvent
Expand Down Expand Up @@ -33,25 +33,25 @@ def __init__(
self.exclude = exclude
self.include = include

def should_fire(self, event: TriggerEvent, playbook_id: str):
should_fire = super().should_fire(event, playbook_id)
def should_fire(self, event: TriggerEvent, playbook_id: str, build_context: Dict[str, Any]):
should_fire = super().should_fire(event, playbook_id, build_context)
if not should_fire:
return should_fire

if not isinstance(event, K8sTriggerEvent):
return False

exec_event = self.build_execution_event(event, {})
if event.k8s_payload.obj.get("type", None) != "Warning":
return False

exec_event = self.build_execution_event(event, {}, build_context)

if not isinstance(exec_event, EventChangeEvent):
return False

if not exec_event.obj or not exec_event.obj.regarding:
return False

if exec_event.get_event().type != "Warning":
return False

if self.operations and exec_event.operation.value not in self.operations:
return False

Expand Down
10 changes: 5 additions & 5 deletions src/robusta/core/triggers/helm_releases_triggers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
from typing import Optional, List, Dict
from datetime import datetime
from typing import Any, Dict, List, Optional

import pytz
from attr import dataclass
from pydantic import BaseModel
Expand Down Expand Up @@ -68,7 +68,7 @@ class HelmReleaseBaseTrigger(BaseTrigger):
def get_trigger_event(self):
return HelmReleasesTriggerEvent.__name__

def should_fire(self, event: TriggerEvent, playbook_id: str):
def should_fire(self, event: TriggerEvent, playbook_id: str, build_context: Dict[str, Any]):
if not isinstance(event, HelmReleasesTriggerEvent):
return False

Expand Down Expand Up @@ -116,8 +116,8 @@ def should_fire(self, event: TriggerEvent, playbook_id: str):

return can_fire

def _build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]]
def build_execution_event(
self, event: HelmReleasesTriggerEvent, sink_findings: Dict[str, List[Finding]], build_context: Dict[str, Any]
) -> Optional[ExecutionBaseEvent]:
if not isinstance(event, HelmReleasesTriggerEvent):
return
Expand Down
8 changes: 5 additions & 3 deletions src/robusta/core/triggers/job_failed_trigger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict

from hikaru.model.rel_1_26 import Job

from robusta.core.playbooks.base_trigger import TriggerEvent
Expand All @@ -22,15 +24,15 @@ def __init__(
labels_selector=labels_selector,
)

def should_fire(self, event: TriggerEvent, playbook_id: str):
should_fire = super().should_fire(event, playbook_id)
def should_fire(self, event: TriggerEvent, playbook_id: str, build_context: Dict[str, Any]):
should_fire = super().should_fire(event, playbook_id, build_context)
if not should_fire:
return should_fire

if not isinstance(event, K8sTriggerEvent):
return False

exec_event = self.build_execution_event(event, {})
exec_event = self.build_execution_event(event, {}, build_context)

if not isinstance(exec_event, JobChangeEvent):
return False
Expand Down
Loading
Loading