Skip to content

Commit

Permalink
Refactor to prevent cyclic dependency (happened when importing the ro…
Browse files Browse the repository at this point in the history
…busta package on the relay) (robusta-dev#514)

Fix jobs cache initialization - Init only once, even on clusters with no jobs
Fix silence url building - Build the url without making an http call (this failed for user with outgoing firewall)
  • Loading branch information
arikalon1 authored Sep 5, 2022
1 parent 88321ba commit 29c70a0
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 57 deletions.
47 changes: 2 additions & 45 deletions src/robusta/core/discovery/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
V1DeploymentList, V1ObjectMeta, V1StatefulSetList, V1DaemonSetList, \
V1ReplicaSetList, V1PodList, V1NodeList, V1JobList, V1Container, V1Volume

from . import utils
from ..model.jobs import JobInfo
from ...core.model.services import ServiceInfo, ContainerInfo, VolumeInfo, ServiceConfig
from ...core.model.pods import PodResources, ResourceAttributes, ContainerResources


class DiscoveryResults:
Expand Down Expand Up @@ -105,7 +105,7 @@ def discovery_process() -> DiscoveryResults:
for pod in pod_items:
pod_status = pod.status.phase
if pod_status in ["Running", "Unknown", "Pending"] and pod.spec.node_name:
node_requests[pod.spec.node_name].append(k8s_pod_requests(pod))
node_requests[pod.spec.node_name].append(utils.k8s_pod_requests(pod))

except Exception:
logging.error(
Expand Down Expand Up @@ -178,46 +178,3 @@ def extract_volumes(resource) -> List[V1Volume]:
except Exception: # may fail if one of the attributes is None
logging.error(f"Failed to extract volumes from {resource}", exc_info=True)
return []

def k8s_pod_requests(pod: V1Pod) -> PodResources:
"""Extract requests from k8s python api pod (not hikaru)"""
return __pod_resources(pod, ResourceAttributes.requests)


def __pod_resources(pod: V1Pod, resource_attribute: ResourceAttributes) -> PodResources:
containers_resources = containers_resources_sum(pod.spec.containers, resource_attribute)
return PodResources(
pod_name=pod.metadata.name,
cpu=containers_resources.cpu,
memory=containers_resources.memory,
)


def containers_resources_sum(
containers: List[V1Container], resource_attribute: ResourceAttributes
) -> ContainerResources:
cpu_sum: float = 0.0
mem_sum: int = 0
for container in containers:
resources = container_resources(container, resource_attribute)
cpu_sum += resources.cpu
mem_sum += resources.memory

return ContainerResources(cpu=cpu_sum, memory=mem_sum)


def container_resources(container: V1Container, resource_attribute: ResourceAttributes) -> ContainerResources:
container_cpu: float = 0.0
container_mem: int = 0

resources: V1ResourceRequirements = container.resources
if resources:
resource_spec = getattr(resources, resource_attribute.name) or {} # requests or limits
container_cpu = PodResources.parse_cpu(
resource_spec.get("cpu", 0.0)
)
container_mem = PodResources.parse_mem(
resource_spec.get("memory", "0Mi")
)

return ContainerResources(cpu=container_cpu, memory=container_mem)
48 changes: 48 additions & 0 deletions src/robusta/core/discovery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List
from kubernetes.client import V1ResourceRequirements, V1Container, V1Pod

from ...core.model.pods import PodResources, ResourceAttributes, ContainerResources


def k8s_pod_requests(pod: V1Pod) -> PodResources:
"""Extract requests from k8s python api pod (not hikaru)"""
return __pod_resources(pod, ResourceAttributes.requests)


def __pod_resources(pod: V1Pod, resource_attribute: ResourceAttributes) -> PodResources:
containers_resources = containers_resources_sum(pod.spec.containers, resource_attribute)
return PodResources(
pod_name=pod.metadata.name,
cpu=containers_resources.cpu,
memory=containers_resources.memory,
)


def containers_resources_sum(
containers: List[V1Container], resource_attribute: ResourceAttributes
) -> ContainerResources:
cpu_sum: float = 0.0
mem_sum: int = 0
for container in containers:
resources = container_resources(container, resource_attribute)
cpu_sum += resources.cpu
mem_sum += resources.memory

return ContainerResources(cpu=cpu_sum, memory=mem_sum)


def container_resources(container: V1Container, resource_attribute: ResourceAttributes) -> ContainerResources:
container_cpu: float = 0.0
container_mem: int = 0

resources: V1ResourceRequirements = container.resources
if resources:
resource_spec = getattr(resources, resource_attribute.name) or {} # requests or limits
container_cpu = PodResources.parse_cpu(
resource_spec.get("cpu", 0.0)
)
container_mem = PodResources.parse_mem(
resource_spec.get("memory", "0Mi")
)

return ContainerResources(cpu=container_cpu, memory=container_mem)
8 changes: 4 additions & 4 deletions src/robusta/core/model/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List, Dict, Optional

from .pods import ContainerResources, ResourceAttributes
from ...core.discovery import discovery
from ...core.discovery import utils

SERVICE_TYPE_JOB = "Job"

Expand All @@ -17,8 +17,8 @@ class JobContainer(BaseModel):

@staticmethod
def from_api_server(container: V1Container) -> "JobContainer":
requests: ContainerResources = discovery.container_resources(container, ResourceAttributes.requests)
limits: ContainerResources = discovery.container_resources(container, ResourceAttributes.limits)
requests: ContainerResources = utils.container_resources(container, ResourceAttributes.requests)
limits: ContainerResources = utils.container_resources(container, ResourceAttributes.limits)
return JobContainer(
image=container.image,
cpu_req=requests.cpu,
Expand Down Expand Up @@ -127,7 +127,7 @@ def from_db_row(job: dict) -> "JobInfo":
@staticmethod
def from_api_server(job: V1Job, pods: List[str]) -> "JobInfo":
containers = job.spec.template.spec.containers
requests: ContainerResources = discovery.containers_resources_sum(containers, ResourceAttributes.requests)
requests: ContainerResources = utils.containers_resources_sum(containers, ResourceAttributes.requests)
status = JobStatus.from_api_server(job)
job_data = JobData.from_api_server(job, pods)
return JobInfo(
Expand Down
8 changes: 4 additions & 4 deletions src/robusta/core/reporting/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import urllib.parse
import uuid
import re
from datetime import datetime
Expand All @@ -9,7 +10,7 @@
from ..model.env_vars import ROBUSTA_UI_DOMAIN
from ..reporting.consts import FindingSubjectType, FindingSource, FindingType
from ...core.discovery.top_service_resolver import TopServiceResolver
from requests import get


class BaseBlock(BaseModel):
hidden: bool = False
Expand Down Expand Up @@ -184,11 +185,10 @@ def get_prometheus_silence_url(self, cluster_id: str) -> str:
if self.subject.namespace:
labels["namespace"] = self.subject.namespace

kind: str = self.subject.subject_type.value
kind: str = str(self.subject.subject_type.value)
if kind and self.subject.name:
labels[kind] = self.subject.name

labels["referer"] = "sink"

uri = get(f"{ROBUSTA_UI_DOMAIN}/silences/create", labels)
return uri.url
return f"{ROBUSTA_UI_DOMAIN}/silences/create?{urllib.parse.urlencode(labels)}"
11 changes: 7 additions & 4 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
import threading
from typing import List, Dict
from typing import List, Dict, Optional
from kubernetes.client import V1NodeList, V1Node, V1NodeCondition, V1Taint

from ...discovery.discovery import Discovery, DiscoveryResults
Expand Down Expand Up @@ -55,7 +55,9 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.__discovery_period_sec = DISCOVERY_PERIOD_SEC
self.__services_cache: Dict[str, ServiceInfo] = {}
self.__nodes_cache: Dict[str, NodeInfo] = {}
self.__jobs_cache: Dict[str, JobInfo] = {}
# Some clusters have no jobs. Initializing jobs cache to None, and not empty dict
# helps differentiate between no jobs, to not initialized
self.__jobs_cache: Optional[Dict[str, JobInfo]] = None
self.__init_service_resolver()
self.__thread = threading.Thread(target=self.__discover_cluster)
self.__thread.start()
Expand Down Expand Up @@ -100,15 +102,16 @@ def __assert_node_cache_initialized(self):
self.__nodes_cache[node.name] = node

def __assert_jobs_cache_initialized(self):
if not self.__jobs_cache:
if self.__jobs_cache is None:
logging.info("Initializing jobs cache")
self.__jobs_cache: Dict[str, JobInfo] = {}
for job in self.dal.get_active_jobs():
self.__jobs_cache[job.get_service_key()] = job

def __reset_caches(self):
self.__services_cache: Dict[str, ServiceInfo] = {}
self.__nodes_cache: Dict[str, NodeInfo] = {}
self.__jobs_cache: Dict[str, JobInfo] = {}
self.__jobs_cache = None

def stop(self):
self.__active = False
Expand Down

0 comments on commit 29c70a0

Please sign in to comment.