Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kube] Allow to run agent on the node #3401

Merged
merged 5 commits into from
Jul 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions tests/core/test_kube_event_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ def test_namespace_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns'])
retr.get_event_array()
mock_method.assert_called_once_with('https://kubernetes/api/v1/namespaces/testns/events', params={})
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/namespaces/testns/events', params={})

def test_namespace_clientside_filtering(self):
val = self._build_events([('ns1', 'k1'), ('ns2', 'k1'), ('testns', 'k1')])
with patch.object(self.kube, 'retrieve_json_auth', return_value=val) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns', 'ns2'])
events = retr.get_event_array()
self.assertEquals(2, len(events))
mock_method.assert_called_once_with('https://kubernetes/api/v1/events', params={})
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/events', params={})

def test_kind_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
retr = KubeEventRetriever(self.kube, kinds=['k1'])
retr.get_event_array()
mock_method.assert_called_once_with('https://kubernetes/api/v1/events',
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/events',
params={'fieldSelector': 'involvedObject.kind=k1'})

def test_kind_clientside_filtering(self):
Expand All @@ -92,4 +92,4 @@ def test_kind_clientside_filtering(self):
retr = KubeEventRetriever(self.kube, kinds=['k1', 'k2'])
events = retr.get_event_array()
self.assertEquals(3, len(events))
mock_method.assert_called_once_with('https://kubernetes/api/v1/events', params={})
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/events', params={})
13 changes: 11 additions & 2 deletions utils/dockerutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from docker import Client, tls

# project
from utils.platform import Platform
from utils.singleton import Singleton

SWARM_SVC_LABEL = 'com.docker.swarm.service.name'
Expand Down Expand Up @@ -80,6 +79,7 @@ def __init__(self, **kwargs):
# Try to detect if an orchestrator is running
self._is_ecs = False
self._is_rancher = False
self._is_k8s = False

try:
containers = self.client.containers()
Expand All @@ -94,12 +94,18 @@ def __init__(self, **kwargs):
log.warning("Error while detecting orchestrator: %s" % e)
pass

try:
from utils.kubernetes import detect_is_k8s
self._is_k8s = detect_is_k8s()
except Exception:
self._is_k8s = False

# Build include/exclude patterns for containers
self._include, self._exclude = instance.get('include', []), instance.get('exclude', [])
if not self._exclude:
# In Kubernetes, pause containers are not interesting to monitor.
# This part could be reused for other platforms where containers can be safely ignored.
if Platform.is_k8s():
if self.is_k8s():
self.filtering_enabled = True
self._exclude = DEFAULT_CONTAINER_EXCLUDE
else:
Expand Down Expand Up @@ -147,6 +153,9 @@ def is_ecs(self):
def is_rancher(self):
return self._is_rancher

def is_k8s(self):
return self._is_k8s

def is_swarm(self):
if self.swarm_node_state == 'pending':
self.fetch_swarm_state()
Expand Down
1 change: 1 addition & 0 deletions utils/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .pod_service_mapper import PodServiceMapper # noqa: F401
from .kube_state_processor import KubeStateProcessor # noqa: F401
from .kube_state_processor import NAMESPACE # noqa: F401
from .kubeutil import detect_is_k8s # noqa: F401
from .kubeutil import KubeUtil # noqa: F401
53 changes: 41 additions & 12 deletions utils/kubernetes/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@
}


def detect_is_k8s():
"""
Logic for DockerUtil to detect whether to enable Kubernetes code paths
It check whether we have a KUBERNETES_PORT environment variable (running
in a pod) or a valid kubernetes.yaml conf file
"""
if 'KUBERNETES_PORT' in os.environ:
return True
else:
try:
k8_config_file_path = get_conf_path(KUBERNETES_CHECK_NAME)
k8_check_config = check_yaml(k8_config_file_path)
return len(k8_check_config['instances']) > 0
except Exception as err:
log.debug("Error detecting kubernetes: %s" % str(err))
return False


class KubeUtil:
__metaclass__ = Singleton

Expand All @@ -46,7 +64,7 @@ class KubeUtil:
DEFAULT_CADVISOR_PORT = 4194
DEFAULT_HTTP_KUBELET_PORT = 10255
DEFAULT_HTTPS_KUBELET_PORT = 10250
DEFAULT_MASTER_PORT = 8080
DEFAULT_MASTER_PORT = 443
DEFAULT_MASTER_NAME = 'kubernetes' # DNS name to reach the master from a pod.
DEFAULT_LABEL_PREFIX = 'kube_'
DEFAULT_COLLECT_SERVICE_TAG = True
Expand All @@ -66,6 +84,7 @@ def __init__(self, instance=None):
# kubernetes.yaml was not found
except IOError as ex:
log.error(ex.message)

instance = {}
except Exception:
log.error('Kubernetes configuration file is invalid. '
Expand All @@ -78,9 +97,15 @@ def __init__(self, instance=None):
self.tls_settings = self._init_tls_settings(instance)

# apiserver
self.kubernetes_api_root_url = 'https://%s' % (os.environ.get('KUBERNETES_SERVICE_HOST') or
self.DEFAULT_MASTER_NAME)
if 'api_server_url' in instance:
self.kubernetes_api_root_url = instance.get('api_server_url')
else:
master_host = os.environ.get('KUBERNETES_SERVICE_HOST') or self.DEFAULT_MASTER_NAME
master_port = os.environ.get('KUBERNETES_SERVICE_PORT') or self.DEFAULT_MASTER_PORT
self.kubernetes_api_root_url = 'https://%s:%d' % (master_host, master_port)

self.kubernetes_api_url = '%s/api/v1' % self.kubernetes_api_root_url

# kubelet
try:
self.kubelet_api_url = self._locate_kubelet(instance)
Expand Down Expand Up @@ -128,10 +153,6 @@ def _init_tls_settings(self, instance):
if apiserver_cacert and os.path.exists(apiserver_cacert):
tls_settings['apiserver_cacert'] = apiserver_cacert

token = self.get_auth_token()
if token:
tls_settings['bearer_token'] = token

# kubelet
kubelet_client_crt = instance.get('kubelet_client_crt')
kubelet_client_key = instance.get('kubelet_client_key')
Expand All @@ -144,6 +165,12 @@ def _init_tls_settings(self, instance):
else:
tls_settings['kubelet_verify'] = instance.get('kubelet_tls_verify', DEFAULT_TLS_VERIFY)

if ('apiserver_client_cert' not in tls_settings) or ('kubelet_client_cert' not in tls_settings):
# Only lookup token if we don't have client certs for both
token = self.get_auth_token(instance)
if token:
tls_settings['bearer_token'] = token

return tls_settings

def _locate_kubelet(self, instance):
Expand Down Expand Up @@ -296,8 +323,8 @@ def perform_kubelet_query(self, url, verbose=True, timeout=10):
verify = tls_context.get('kubelet_verify', DEFAULT_TLS_VERIFY)

# if cert-based auth is enabled, don't use the token.
if not cert and url.lower().startswith('https'):
headers = {'Authorization': 'Bearer {}'.format(self.get_auth_token())}
if not cert and url.lower().startswith('https') and 'bearer_token' in self.tls_settings:
headers = {'Authorization': 'Bearer {}'.format(self.tls_settings.get('bearer_token'))}

return requests.get(url, timeout=timeout, verify=verify,
cert=cert, headers=headers, params={'verbose': verbose})
Expand Down Expand Up @@ -409,15 +436,17 @@ def are_tags_filtered(self, tags):
return self.docker_util.are_tags_filtered(tags)

@classmethod
def get_auth_token(cls):
def get_auth_token(cls, instance):
"""
Return a string containing the authorization token for the pod.
"""

token_path = instance.get('bearer_token_path', cls.AUTH_TOKEN_PATH)
try:
with open(cls.AUTH_TOKEN_PATH) as f:
with open(token_path) as f:
return f.read()
except IOError as e:
log.error('Unable to read token from {}: {}'.format(cls.AUTH_TOKEN_PATH, e))
log.error('Unable to read token from {}: {}'.format(token_path, e))

return None

Expand Down
7 changes: 2 additions & 5 deletions utils/orchestrator/kubeutilproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under Simplified BSD License (see LICENSE)

from utils.kubernetes import KubeUtil
from utils.platform import Platform
from .baseutil import BaseUtil


Expand All @@ -12,11 +13,7 @@ def get_container_tags(self, cid=None, co=None):

@staticmethod
def is_detected():
try:
tags = KubeUtil().get_node_hosttags()
return bool(tags)
except Exception:
return False
return Platform.is_k8s()

def get_host_tags(self):
return KubeUtil().get_node_hosttags()
3 changes: 2 additions & 1 deletion utils/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def is_containerized():

@staticmethod
def is_k8s():
return 'KUBERNETES_PORT' in os.environ
from utils.dockerutil import DockerUtil
return DockerUtil().is_k8s()

@staticmethod
def is_rancher():
Expand Down