Skip to content

Commit

Permalink
Merge pull request robusta-dev#43 from robusta-dev/service_resolution…
Browse files Browse the repository at this point in the history
…_cache_miss

Fix service resolution cache miss
  • Loading branch information
arikalon1 authored Aug 28, 2021
2 parents 9c59b32 + 2490932 commit 69579ba
Show file tree
Hide file tree
Showing 18 changed files with 292 additions and 110 deletions.
1 change: 1 addition & 0 deletions deployment/base/config/.kubewatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ resource:
replicationcontroller: true
replicaset: true
daemonset: true
statefulset: true
services: true
pod: true
job: true
Expand Down
2 changes: 1 addition & 1 deletion deployment/base/forwarder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spec:
containers:
- name: kubewatch
# this is a custom version of kubewatch built from https://github.com/aantn/kubewatch
image: us-central1-docker.pkg.dev/genuine-flight-317411/devel/kubewatch:v1.9
image: us-central1-docker.pkg.dev/genuine-flight-317411/devel/kubewatch:v1.10
imagePullPolicy: Always
env:
- name: KW_CONFIG
Expand Down
124 changes: 90 additions & 34 deletions scripts/generate_kubernetes_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@
from typing import TextIO

KUBERNETES_VERSIONS = ["v1", "v2beta1", "v2beta2"]
KUBERNETES_RESOURCES = ["Pod", "ReplicaSet", "DaemonSet", "Deployment", "Service", "ConfigMap", "Event", "HorizontalPodAutoscaler", "Node"]
KUBERNETES_RESOURCES = [
"Pod",
"ReplicaSet",
"DaemonSet",
"Deployment",
"StatefulSet",
"Service",
"ConfigMap",
"Event",
"HorizontalPodAutoscaler",
"Node",
]
TRIGGER_TYPES = {
"create": "K8sOperationType.CREATE",
"update": "K8sOperationType.UPDATE",
"delete": "K8sOperationType.DELETE",
"all_changes": "None",
}

CUSTOM_SUBCLASSES = {
"Pod": "RobustaPod",
"Deployment": "RobustaDeployment"
}
CUSTOM_SUBCLASSES = {"Pod": "RobustaPod", "Deployment": "RobustaDeployment"}
CUSTOM_SUBCLASSES_NAMES_STR = ",".join(CUSTOM_SUBCLASSES.values())

COMMON_PREFIX = """# This file was autogenerated. Do not edit.\n\n"""
Expand All @@ -30,62 +38,85 @@ def get_model_class(k8s_resource_name: str) -> str:

def autogenerate_events(f: TextIO):
f.write(COMMON_PREFIX)
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
from dataclasses import dataclass
from typing import Union
from ..base_event import K8sBaseEvent
from ..custom_models import {CUSTOM_SUBCLASSES_NAMES_STR}
"""))
"""
)
)

for version in KUBERNETES_VERSIONS:
for resource in KUBERNETES_RESOURCES:
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
from hikaru.model.rel_1_16.{version} import {resource} as {version}{resource}
"""))

"""
)
)

all_versioned_resources = set()
for resource in KUBERNETES_RESOURCES:
if resource in CUSTOM_SUBCLASSES:
model_class_str = get_model_class(resource)
all_versioned_resources.add(model_class_str)
else:
version_resources = [f"{version}{resource}" for version in KUBERNETES_VERSIONS]
version_resources = [
f"{version}{resource}" for version in KUBERNETES_VERSIONS
]
model_class_str = f"Union[{','.join(version_resources)}]"
all_versioned_resources = all_versioned_resources.union(set(version_resources))
all_versioned_resources = all_versioned_resources.union(
set(version_resources)
)

f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
@dataclass
class {resource}Event (K8sBaseEvent):
obj: {model_class_str}
old_obj: {model_class_str}
"""))
"""
)
)

# add the KubernetesAnyEvent
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
@dataclass
class KubernetesAnyEvent (K8sBaseEvent):
obj: {f"Union[{','.join(all_versioned_resources)}]"}
old_obj: {f"Union[{','.join(all_versioned_resources)}]"}
"""))
"""
)
)

mappers = [f"'{r}': {r}Event" for r in KUBERNETES_RESOURCES]
mappers_str = ",\n ".join(mappers)
f.write(f"\nKIND_TO_EVENT_CLASS = {{\n {mappers_str}\n}}\n")


def autogenerate_models(f: TextIO, version : str):
def autogenerate_models(f: TextIO, version: str):
f.write(COMMON_PREFIX)
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
from hikaru.model.rel_1_16.{version} import *
from ...custom_models import {CUSTOM_SUBCLASSES_NAMES_STR}
"""))
"""
)
)

mappers = [f"'{r}': {get_model_class(r)}" for r in KUBERNETES_RESOURCES]
mappers_str = ",\n ".join(mappers)
Expand All @@ -96,63 +127,88 @@ def autogenerate_versioned_models(f: TextIO):
f.write(COMMON_PREFIX)
for version in KUBERNETES_VERSIONS:

f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
from .{version}.models import KIND_TO_MODEL_CLASS as {version}
"""))
"""
)
)

mappers = [f"'{version}': {version}" for version in KUBERNETES_VERSIONS]
mappers_str = ",\n ".join(mappers)

f.write(f"VERSION_KIND_TO_MODEL_CLASS = {{\n {mappers_str}\n}}\n")
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
def get_api_version(apiVersion: str):
if "/" in apiVersion:
apiVersion = apiVersion.split("/")[1]
return VERSION_KIND_TO_MODEL_CLASS.get(apiVersion)
"""))

"""
)
)


def autogenerate_triggers(f: TextIO):
f.write(COMMON_PREFIX)
f.write(textwrap.dedent("""\
f.write(
textwrap.dedent(
"""\
from ....utils.decorators import doublewrap
from ..base_triggers import register_k8s_playbook, register_k8s_any_playbook
from ..base_event import K8sOperationType
"""))
"""
)
)

for resource in KUBERNETES_RESOURCES:
f.write(f"# {resource} Triggers\n")
for trigger_name, operation_type in TRIGGER_TYPES.items():
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
@doublewrap
def on_{resource.lower()}_{trigger_name}(func, name_prefix='', namespace_prefix=''):
return register_k8s_playbook(func, '{resource}', {operation_type}, name_prefix=name_prefix, namespace_prefix=namespace_prefix)
"""))
"""
)
)

f.write(f"# Kubernetes Any Triggers\n")
for trigger_name, operation_type in TRIGGER_TYPES.items():
f.write(textwrap.dedent(f"""\
f.write(
textwrap.dedent(
f"""\
@doublewrap
def on_kubernetes_any_resource_{trigger_name}(func, name_prefix='', namespace_prefix=''):
return register_k8s_any_playbook(func, {operation_type}, name_prefix=name_prefix, namespace_prefix=namespace_prefix)
"""))
"""
)
)


def main():
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
output_dir = os.path.join(root_dir, "src/robusta/integrations/kubernetes/autogenerated/")

parser = argparse.ArgumentParser(description='Autogenerate kubernetes models, events, and triggers')
parser.add_argument('-o', '--output', default=output_dir, type=str, help='output directory')
output_dir = os.path.join(
root_dir, "src/robusta/integrations/kubernetes/autogenerated/"
)

parser = argparse.ArgumentParser(
description="Autogenerate kubernetes models, events, and triggers"
)
parser.add_argument(
"-o", "--output", default=output_dir, type=str, help="output directory"
)
args = parser.parse_args()

# generate versioned events and models
Expand Down
67 changes: 54 additions & 13 deletions src/robusta/core/discovery/top_service_resolver.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
import logging
import threading
import time
from collections import defaultdict
from typing import List

from ..model.services import ServiceInfo, get_service_key
from pydantic.main import BaseModel

from ..model.env_vars import SERVICE_UPDATES_CACHE_TTL_SEC
from ..model.services import ServiceInfo


class CachedServiceInfo(BaseModel):
service_info: ServiceInfo
event_time: float


class TopServiceResolver:
__recent_service_updates = {}
__namespace_to_service = defaultdict(list)
__cached_updates_lock = threading.Lock()

cached_services: List[ServiceInfo] = []
@classmethod
def store_cached_services(cls, services: List[ServiceInfo]):
new_store = defaultdict(list)
for service in services:
new_store[service.namespace].append(service)

# The services are stored periodically, after reading it from the API server. If, between reads
# new services are added, they will be missing from the cache. So, in addition to the periodic read, we
# update the cache from listening to add/update API server events.
# append recent updates, to avoid race conditions between events and api server read
with cls.__cached_updates_lock:
recent_updates_keys = list(cls.__recent_service_updates.keys())
for service_key in recent_updates_keys:
recent_update = cls.__recent_service_updates[service_key]
if (
time.time() - recent_update.event_time
> SERVICE_UPDATES_CACHE_TTL_SEC
):
del cls.__recent_service_updates[service_key]
else:
new_store[recent_update.service_info.namespace].append(
recent_update.service_info
)

cls.__namespace_to_service = new_store

# TODO remove this guess function
# temporary try to guess who the owner service is.
@staticmethod
def guess_service_key(name: str, namespace: str) -> str:
for cached_service in TopServiceResolver.cached_services:
if cached_service.namespace == namespace and name.startswith(
cached_service.name
):
return get_service_key(
cached_service.name,
cached_service.service_type,
cached_service.namespace,
)
@classmethod
def guess_service_key(cls, name: str, namespace: str) -> str:
for cached_service in cls.__namespace_to_service[namespace]:
if name.startswith(cached_service.name):
return cached_service.get_service_key()
return ""

@classmethod
def add_cached_service(cls, service: ServiceInfo):
cls.__namespace_to_service[service.namespace].append(service)
with cls.__cached_updates_lock:
cls.__recent_service_updates[service.get_service_key()] = CachedServiceInfo(
service_info=service, event_time=time.time()
)
4 changes: 4 additions & 0 deletions src/robusta/core/model/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
GRAFANA_RENDERER_URL = os.environ.get(
"GRAFANA_RENDERER_URL", "http://127.0.0.1:8281/render"
)
SERVICE_UPDATES_CACHE_TTL_SEC = os.environ.get("SERVICE_UPDATES_CACHE_TTL_SEC", 120)
INTERNAL_PLAYBOOKS_ROOT = os.environ.get(
"INTERNAL_PLAYBOOKS_ROOT", "/app/robusta/core/playbooks/internal"
)
7 changes: 3 additions & 4 deletions src/robusta/core/model/services.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from pydantic import BaseModel


def get_service_key(name: str, service_type: str, namespace: str) -> str:
return f"{namespace}/{service_type}/{name}"


class ServiceInfo(BaseModel):
name: str
service_type: str
namespace: str
classification: str = "None"
deleted: bool = False

def get_service_key(self) -> str:
return f"{self.namespace}/{self.service_type}/{self.name}"

def __eq__(self, other):
if not isinstance(other, ServiceInfo):
return NotImplemented
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions src/robusta/core/playbooks/internal/discovery_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from robusta.api import *
from robusta.core.model.services import ServiceInfo
from robusta.core.discovery.top_service_resolver import TopServiceResolver


@on_kubernetes_any_resource_all_changes
def cluster_discovery_updates(event: KubernetesAnyEvent):
if (
event.operation in [K8sOperationType.CREATE, K8sOperationType.UPDATE]
and event.obj.kind in ["Deployment", "ReplicaSet", "DaemonSet", "StatefulSet"]
and not event.obj.metadata.ownerReferences
):
TopServiceResolver.add_cached_service(
ServiceInfo(
name=event.obj.metadata.name,
service_type=event.obj.kind,
namespace=event.obj.metadata.namespace,
)
)
Loading

0 comments on commit 69579ba

Please sign in to comment.