Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kubernetes_state] refactor gauge submission, fix container.restarts #3297

Merged
merged 1 commit into from
Apr 5, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 54 additions & 136 deletions utils/kubernetes/kube_state_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

NAMESPACE = 'kubernetes_state'

# message.type is the index in this array
# see: https://github.com/prometheus/client_model/blob/model-0.0.2/metrics.proto#L24-L28
METRIC_TYPES = ['counter', 'gauge']


class KubeStateProcessor:
def __init__(self, kubernetes_check):
self.kube_check = kubernetes_check
Expand All @@ -20,13 +25,38 @@ def __init__(self, kubernetes_check):
# 'unknown': AgentCheck.UNKNOWN
}

# these metrics will be extracted with all their labels and reported as-is with their corresponding metric name
self.metric_to_gauge = {
# message.metric: datadog metric name
'kube_node_status_capacity_cpu_cores': NAMESPACE + '.node.cpu_capacity',
'kube_node_status_capacity_memory_bytes': NAMESPACE + '.node.memory_capacity',
'kube_node_status_capacity_pods': NAMESPACE + '.node.pods_capacity',
'kube_node_status_allocatable_cpu_cores': NAMESPACE + '.node.cpu_allocatable',
'kube_node_status_allocatable_memory_bytes': NAMESPACE + '.node.memory_allocatable',
'kube_node_status_allocatable_pods': NAMESPACE + '.node.pods_allocatable',
'kube_deployment_status_replicas_available': NAMESPACE + '.deployment.replicas_available',
'kube_deployment_status_replicas_unavailable': NAMESPACE + '.deployment.replicas_unavailable',
'kube_deployment_status_replicas_updated': NAMESPACE + '.deployment.replicas_updated',
'kube_deployment_spec_replicas': NAMESPACE + '.deployment.replicas_desired',
'kube_pod_container_resource_requests_cpu_cores': NAMESPACE + '.container.cpu_requested',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI check fail on .container.cpu_requested, looks like the valid k8s metric name is : kube_pod_container_requested_cpu_cores

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what makes the CI fail is that we need to merge the protobuf.bin fixture, otherwise the metric doesn't exist. Tests pass locally when I copy the files.

'kube_pod_container_resource_requests_memory_bytes': NAMESPACE + '.container.memory_requested',
'kube_pod_container_resource_limits_cpu_cores': NAMESPACE + '.container.cpu_limit',
'kube_pod_container_resource_limits_memory_bytes': NAMESPACE + '.container.memory_limit',
'kube_pod_container_status_restarts': NAMESPACE + '.container.restarts'
}

def process(self, message, **kwargs):
"""
Search this class for a method with the same name of the message and
invoke it. Log some info if method was not found.
Handle a message according to the following flow:
- search self.metric_to_gauge for a prometheus.metric <--> datadog.metric mapping
- call check method with the same name as the metric
- log some info if none of the above worked
"""
try:
getattr(self, message.name)(message, **kwargs)
if message.name in self.metric_to_gauge:
self._submit_gauges(self.metric_to_gauge[message.name], message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We loose the label argument if passed to process(). Should we keep it in _submit_gauges or just remove the corresponding code branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like it could be useful someday, but yeah let's kill it for now since it's not used.

else:
getattr(self, message.name)(message, **kwargs)
except AttributeError:
self.log.debug("Unable to handle metric: {}".format(message.name))

Expand Down Expand Up @@ -72,140 +102,25 @@ def _extract_label_value(self, name, labels):
return label.value
return None

def kube_node_status_capacity_cpu_cores(self, message, **kwargs):
""" The total CPU resources of the node. """
metric_name = NAMESPACE + '.node.cpu_capacity'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_node_status_capacity_memory_bytes(self, message, **kwargs):
""" The total memory resources of the node. """
metric_name = NAMESPACE + '.node.memory_capacity'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_node_status_capacity_pods(self, message, **kwargs):
""" The total pod resources of the node. """
metric_name = NAMESPACE + '.node.pods_capacity'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_node_status_allocatable_cpu_cores(self, message, **kwargs):
""" The CPU resources of a node that are available for scheduling. """
metric_name = NAMESPACE + '.node.cpu_allocatable'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_node_status_allocatable_memory_bytes(self, message, **kwargs):
""" The memory resources of a node that are available for scheduling. """
metric_name = NAMESPACE + '.node.memory_allocatable'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_node_status_allocatable_pods(self, message, **kwargs):
""" The pod resources of a node that are available for scheduling. """
metric_name = NAMESPACE + '.node.pods_allocatable'
for metric in message.metric:
val = metric.gauge.value
tags = ['node:{}'.format(self._extract_label_value("node", metric.label))]
self.gauge(metric_name, val, tags)

def kube_deployment_status_replicas_available(self, message, **kwargs):
""" The number of available replicas per deployment. """
metric_name = NAMESPACE + '.deployment.replicas_available'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

def kube_deployment_status_replicas_unavailable(self, message, **kwargs):
""" The number of unavailable replicas per deployment. """
metric_name = NAMESPACE + '.deployment.replicas_unavailable'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

def kube_deployment_status_replicas_updated(self, message, **kwargs):
""" The number of updated replicas per deployment. """
metric_name = NAMESPACE + '.deployment.replicas_updated'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

def kube_deployment_spec_replicas(self, message, **kwargs):
""" Number of desired pods for a deployment. """
metric_name = NAMESPACE + '.deployment.replicas_desired'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

# Labels attached: namespace, pod, container, node
def kube_pod_container_requested_cpu_cores(self, message, **kwargs):
""" CPU cores requested for a container in a pod. """
metric_name = NAMESPACE + '.container.cpu_requested'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
#TODO: add deployment/replicaset?
self.gauge(metric_name, val, tags)

# Labels attached: namespace, pod, container, node
def kube_pod_container_requested_memory_bytes(self, message, **kwargs):
""" Memory bytes requested for a container in a pod. """
metric_name = NAMESPACE + '.container.memory_requested'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

# TODO: Uncomment after kube-state-metrics 0.4 is released
# Labels attached: namespace, pod, container, node
# def kube_pod_container_limits_cpu_cores(self, message, **kwargs):
# """ CPU cores limit for a container in a pod. """
# metric_name = NAMESPACE + '.container.cpu_limit'
# for metric in message.metric:
# val = metric.gauge.value
# tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
# #TODO: add deployment/replicaset?
# self.gauge(metric_name, val, tags)

# TODO: Uncomment after kube-state-metrics 0.4 is released
# Labels attached: namespace, pod, container, node
# def kube_pod_container_limits_memory_bytes(self, message, **kwargs):
# """ Memory byte limit for a container in a pod. """
# metric_name = NAMESPACE + '.container.memory_limit'
# for metric in message.metric:
# val = metric.gauge.value
# tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
# self.gauge(metric_name, val, tags)
def _submit_gauges(self, metric_name, message):
"""
For each metric in the message, report it as a gauge with all labels as tags
except if a labels dict is passed, in which case keys are label names we'll extract
and corresponding values are tag names we'll use (eg: {'node': 'node'})
"""
if message.type < len(METRIC_TYPES):
for metric in message.metric:
val = getattr(metric, METRIC_TYPES[message.type]).value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)
else:
self.log.error("Metric type %s unsupported for metric %s." % (message.type, message.name))

# TODO: implement kube_pod_container_status_ready
# TODO: implement kube_pod_container_status_running
# TODO: implement kube_pod_container_status_terminated
# TODO: implement kube_pod_container_status_waiting

# Labels attached: namespace, pod, container
def kube_pod_container_status_restarts(self, message, **kwargs):
""" Number of desired pods for a deployment. """
metric_name = NAMESPACE + '.container.restarts'
for metric in message.metric:
val = metric.gauge.value
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
self.gauge(metric_name, val, tags)

# TODO: implement kube_pod_info
# TODO: implement kube_pod_status_ready
# TODO: implement kube_pod_status_scheduled
Expand Down Expand Up @@ -258,8 +173,11 @@ def kube_node_spec_unschedulable(self, message, **kwargs):
""" Whether a node can schedule new pods. """
metric_name = NAMESPACE + '.node.status'
statuses = ('schedulable', 'unschedulable')
for metric in message.metric:
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
status = statuses[int(metric.gauge.value)] # value can be 0 or 1
tags.append('status:{}'.format(status))
self.gauge(metric_name, 1, tags) # metric value is always one, value is on the tags
if message.type < len(METRIC_TYPES):
for metric in message.metric:
tags = ['{}:{}'.format(label.name, label.value) for label in metric.label]
status = statuses[int(getattr(metric, METRIC_TYPES[message.type]).value)] # value can be 0 or 1
tags.append('status:{}'.format(status))
self.gauge(metric_name, 1, tags) # metric value is always one, value is on the tags
else:
self.log.error("Metric type %s unsupported for metric %s" % (message.type, message.name))