diff --git a/examples/example1.py b/examples/example1.py index bcc25ad8b..a63955abb 100644 --- a/examples/example1.py +++ b/examples/example1.py @@ -7,7 +7,7 @@ async def main(): # Configs can be set in Configuration class directly or using helper # utility. If no argument provided, the config will be loaded from # default location. - config.load_kube_config() + await config.load_kube_config() v1 = client.CoreV1Api() print("Listing pods with their IPs:") diff --git a/examples/example2.py b/examples/example2.py index 14ff25082..21c28c90a 100644 --- a/examples/example2.py +++ b/examples/example2.py @@ -7,7 +7,7 @@ async def main(): # Configs can be set in Configuration class directly or using helper # utility. If no argument provided, the config will be loaded from # default location. - config.load_kube_config() + await config.load_kube_config() v1 = client.CoreV1Api() count = 10 diff --git a/examples/example3.py b/examples/example3.py index a1cf334b9..0124296eb 100644 --- a/examples/example3.py +++ b/examples/example3.py @@ -8,7 +8,7 @@ async def main(): # Configs can be set in Configuration class directly or using helper # utility. If no argument provided, the config will be loaded from # default location. - config.load_kube_config() + await config.load_kube_config() v1 = client.CoreV1Api() diff --git a/examples/example4.py b/examples/example4.py index 187dcdd7d..6c63c20e6 100644 --- a/examples/example4.py +++ b/examples/example4.py @@ -8,20 +8,22 @@ async def watch_namespaces(): v1 = client.CoreV1Api() async for event in watch.Watch().stream(v1.list_namespace): etype, obj = event['type'], event['object'] - print(f"{etype} namespace {obj.metadata.name}") + print("{} namespace {}".format(etype, obj.metadata.name)) async def watch_pods(): v1 = client.CoreV1Api() async for event in watch.Watch().stream(v1.list_pod_for_all_namespaces): evt, obj = event['type'], event['object'] - print(f"{evt} pod {obj.metadata.name} in NS {obj.metadata.namespace}") + print("{} pod {} in NS {}".format(evt, obj.metadata.name, obj.metadata.namespace)) def main(): + loop = asyncio.get_event_loop() + # Load the kubeconfig file specified in the KUBECONFIG environment # variable, or fall back to `~/.kube/config`. - config.load_kube_config() + loop.run_until_complete(config.load_kube_config()) # Define the tasks to watch namespaces and pods. tasks = [ @@ -30,7 +32,6 @@ def main(): ] # Push tasks into event loop. - loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close() diff --git a/examples/tail.py b/examples/tail.py index f4180141a..3db7578d6 100644 --- a/examples/tail.py +++ b/examples/tail.py @@ -51,7 +51,7 @@ async def print_pod_log(pod, namespace, container, lines, follow): async def main(): args = parse_args() - config.load_kube_config() + loader = await config.load_kube_config() v1 = client.CoreV1Api() ret = await v1.list_namespaced_pod(args.namespace) @@ -69,6 +69,10 @@ async def main(): print('No matching PODs !') return + if args.follow: + # autorefresh gcp token + cmd.append(config.refresh_token(loader)) + await asyncio.wait(cmd) diff --git a/kubernetes_asyncio/config/__init__.py b/kubernetes_asyncio/config/__init__.py index 3476ff714..3eebd4bc2 100644 --- a/kubernetes_asyncio/config/__init__.py +++ b/kubernetes_asyncio/config/__init__.py @@ -15,4 +15,4 @@ from .config_exception import ConfigException from .incluster_config import load_incluster_config from .kube_config import (list_kube_config_contexts, load_kube_config, - new_client_from_config) + new_client_from_config, refresh_token) diff --git a/kubernetes_asyncio/config/google_auth.py b/kubernetes_asyncio/config/google_auth.py new file mode 100644 index 000000000..ebad1fe28 --- /dev/null +++ b/kubernetes_asyncio/config/google_auth.py @@ -0,0 +1,27 @@ +import asyncio.subprocess +import json +import shlex +from types import SimpleNamespace + + +async def google_auth_credentials(provider): + + if 'cmd-path' not in provider or 'cmd-args' not in provider: + raise ValueError('GoogleAuth via gcloud is supported! Values for cmd-path, cmd-args are required.') + + cmd = shlex.split(' '.join((provider['cmd-path'], provider['cmd-args']))) + cmd_exec = asyncio.create_subprocess_exec(*cmd, + stdin=None, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + proc = await cmd_exec + + data = await proc.stdout.read() + data = data.decode('ascii').rstrip() + data = json.loads(data) + + await proc.wait() + return SimpleNamespace( + token=data['credential']['access_token'], + expiry=data['credential']['token_expiry'] + ) diff --git a/kubernetes_asyncio/config/google_auth_test.py b/kubernetes_asyncio/config/google_auth_test.py new file mode 100644 index 000000000..de4a6db65 --- /dev/null +++ b/kubernetes_asyncio/config/google_auth_test.py @@ -0,0 +1,42 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from asynctest import TestCase, main + +from .google_auth import google_auth_credentials + + +class TestGoogleAuth(TestCase): + + async def test_google_auth_credentials(self): + + provider = { + 'cmd-path': '/bin/echo', + 'cmd-args': '{\\"credential\\": {\\"access_token\\": \\"token\\", ' + '\\"token_expiry\\": \\"2001.01.01T00:00:00Z\\"}}' + } + + ret = await google_auth_credentials(provider) + + self.assertEqual(ret.token, 'token') + self.assertEqual(ret.expiry, '2001.01.01T00:00:00Z') + + async def test_google_auth_credentials_exception(self): + + with self.assertRaisesRegex(ValueError, "cmd-path, cmd-args are required."): + await google_auth_credentials({}) + + +if __name__ == '__main__': + main() diff --git a/kubernetes_asyncio/config/kube_config.py b/kubernetes_asyncio/config/kube_config.py index 95385aa36..ea7b91c5e 100644 --- a/kubernetes_asyncio/config/kube_config.py +++ b/kubernetes_asyncio/config/kube_config.py @@ -12,21 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import atexit import base64 import datetime import os import tempfile -import google.auth -import google.auth.transport.requests import urllib3 import yaml from kubernetes_asyncio.client import ApiClient, Configuration from .config_exception import ConfigException -from .dateutil import UTC, format_rfc3339, parse_rfc3339 +from .dateutil import UTC, parse_rfc3339 +from .google_auth import google_auth_credentials EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5) KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config') @@ -59,7 +59,7 @@ def _create_temp_file_with_content(content): def _is_expired(expiry): - return ((parse_rfc3339(expiry) + EXPIRY_SKEW_PREVENTION_DELAY) <= + return ((parse_rfc3339(expiry) - EXPIRY_SKEW_PREVENTION_DELAY) <= datetime.datetime.utcnow().replace(tzinfo=UTC)) @@ -124,22 +124,14 @@ def __init__(self, config_dict, active_context=None, self._current_context = None self._user = None self._cluster = None + self.provider = None self.set_active_context(active_context) self._config_base_path = config_base_path self._config_persister = config_persister - - def _refresh_credentials(): - credentials, project_id = google.auth.default( - scopes=['https://www.googleapis.com/auth/cloud-platform'] - ) - request = google.auth.transport.requests.Request() - credentials.refresh(request) - return credentials - if get_google_credentials: self._get_google_credentials = get_google_credentials else: - self._get_google_credentials = _refresh_credentials + self._get_google_credentials = None def set_active_context(self, context_name=None): if context_name is None: @@ -158,8 +150,10 @@ def set_active_context(self, context_name=None): self._user = None self._cluster = self._config['clusters'].get_with_name( self._current_context['context']['cluster'])['cluster'] + if self._user is not None and 'auth-provider' in self._user and 'name' in self._user['auth-provider']: + self.provider = self._user['auth-provider']['name'] - def _load_authentication(self): + async def _load_authentication(self): """Read authentication from kube-config user section if exists. This function goes through various authentication methods in user @@ -173,40 +167,39 @@ def _load_authentication(self): """ if not self._user: return - if self._load_gcp_token(): + + if self.provider == 'gcp': + await self.load_gcp_token() return + if self._load_user_token(): return self._load_user_pass_token() - def _load_gcp_token(self): - if 'auth-provider' not in self._user: - return - provider = self._user['auth-provider'] - if 'name' not in provider: - return - if provider['name'] != 'gcp': - return - - if (('config' not in provider) or - ('access-token' not in provider['config']) or - ('expiry' in provider['config'] and - _is_expired(provider['config']['expiry']))): - # token is not available or expired, refresh it - self._refresh_gcp_token() + async def load_gcp_token(self): - self.token = "Bearer %s" % provider['config']['access-token'] - return self.token - - def _refresh_gcp_token(self): if 'config' not in self._user['auth-provider']: self._user['auth-provider'].value['config'] = {} - provider = self._user['auth-provider']['config'] - credentials = self._get_google_credentials() - provider.value['access-token'] = credentials.token - provider.value['expiry'] = format_rfc3339(credentials.expiry) - if self._config_persister: - self._config_persister(self._config.value) + + config = self._user['auth-provider']['config'] + + if (('access-token' not in config) or + ('expiry' in config and _is_expired(config['expiry']))): + + if self._get_google_credentials is not None: + if asyncio.iscoroutinefunction(self._get_google_credentials): + credentials = await self._get_google_credentials() + else: + credentials = self._get_google_credentials() + else: + credentials = await google_auth_credentials(config) + config.value['access-token'] = credentials.token + config.value['expiry'] = credentials.expiry + if self._config_persister: + self._config_persister(self._config.value) + + self.token = "Bearer %s" % config['access-token'] + return self.token def _load_user_token(self): token = FileOrData( @@ -241,16 +234,18 @@ def _load_cluster_info(self): self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] def _set_config(self, client_configuration): + if 'token' in self.__dict__: client_configuration.api_key['authorization'] = self.token + # copy these keys directly from self to configuration object keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl'] for key in keys: if key in self.__dict__: setattr(client_configuration, key, getattr(self, key)) - def load_and_set(self, client_configuration): - self._load_authentication() + async def load_and_set(self, client_configuration): + await self._load_authentication() self._load_cluster_info() self._set_config(client_configuration) @@ -339,9 +334,9 @@ def list_kube_config_contexts(config_file=None): return loader.list_contexts(), loader.current_context -def load_kube_config(config_file=None, context=None, - client_configuration=None, - persist_config=True): +async def load_kube_config(config_file=None, context=None, + client_configuration=None, + persist_config=True): """Loads authentication and cluster information from kube-config file and stores them in kubernetes.client.configuration. @@ -369,21 +364,43 @@ def _save_kube_config(config_map): config_persister=config_persister) if client_configuration is None: config = type.__call__(Configuration) - loader.load_and_set(config) + await loader.load_and_set(config) Configuration.set_default(config) else: - loader.load_and_set(client_configuration) + await loader.load_and_set(client_configuration) + + return loader -def new_client_from_config( - config_file=None, - context=None, - persist_config=True): +async def refresh_token(loader, client_configuration=None, interval=60): + """Refresh token if necessary, updates the token in client configurarion + + :param loader: KubeConfigLoader returned by load_kube_config + :param client_configuration: The kubernetes.client.Configuration to + set configs to. + :param interval: how often check if token is up-to-date + + """ + if loader.provider != 'gcp': + return + + if client_configuration is None: + client_configuration = Configuration() + + while 1: + await asyncio.sleep(interval) + await loader.load_gcp_token() + client_configuration.api_key['authorization'] = loader.token + + +async def new_client_from_config(config_file=None, context=None, persist_config=True): """Loads configuration the same as load_kube_config but returns an ApiClient to be used with any API object. This will allow the caller to concurrently talk with multiple clusters.""" client_config = type.__call__(Configuration) - load_kube_config(config_file=config_file, context=context, - client_configuration=client_config, - persist_config=persist_config) + + await load_kube_config(config_file=config_file, context=context, + client_configuration=client_config, + persist_config=persist_config) + return ApiClient(configuration=client_config) diff --git a/kubernetes_asyncio/config/kube_config_test.py b/kubernetes_asyncio/config/kube_config_test.py index c394c173d..748561c1a 100644 --- a/kubernetes_asyncio/config/kube_config_test.py +++ b/kubernetes_asyncio/config/kube_config_test.py @@ -17,17 +17,17 @@ import os import shutil import tempfile -import unittest from types import SimpleNamespace import yaml +from asynctest import Mock, TestCase, main, patch from six import PY3 from .config_exception import ConfigException from .kube_config import ( ConfigNode, FileOrData, KubeConfigLoader, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, - load_kube_config, new_client_from_config, + load_kube_config, new_client_from_config, refresh_token, ) BEARER_TOKEN_FORMAT = "Bearer %s" @@ -68,7 +68,7 @@ def _raise_exception(st): TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT) -class BaseTestCase(unittest.TestCase): +class BaseTestCase(TestCase): def setUp(self): self._temp_files = [] @@ -76,6 +76,7 @@ def setUp(self): def tearDown(self): for f in self._temp_files: os.remove(f) + patch.stopall() def _create_temp_file(self, content=""): handler, name = tempfile.mkstemp() @@ -89,6 +90,11 @@ def expect_exception(self, func, message_part, *args, **kwargs): func(*args, **kwargs) self.assertIn(message_part, str(context.exception)) + async def async_expect_exception(self, func, message_part, *args, **kwargs): + with self.assertRaises(ConfigException) as context: + await func(*args, **kwargs) + self.assertIn(message_part, str(context.exception)) + class TestFileOrData(BaseTestCase): @@ -478,19 +484,19 @@ class TestKubeConfigLoader(BaseTestCase): ] } - def test_no_user_context(self): + async def test_no_user_context(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_user").load_and_set(actual) self.assertEqual(expected, actual) - def test_simple_token(self): + async def test_simple_token(self): expected = FakeConfig(host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="simple_token").load_and_set(actual) self.assertEqual(expected, actual) @@ -502,47 +508,66 @@ def test_load_user_token(self): self.assertTrue(loader._load_user_token()) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) - def test_gcp_no_refresh(self): + async def test_gcp_no_refresh(self): expected = FakeConfig( host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception( "SHOULD NOT BE CALLED")).load_and_set(actual) self.assertEqual(expected, actual) - def test_load_gcp_token_no_refresh(self): + async def test_load_gcp_token_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception( "SHOULD NOT BE CALLED")) - self.assertTrue(loader._load_gcp_token()) + res = await loader.load_gcp_token() + self.assertTrue(res) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) - def test_load_gcp_token_with_refresh(self): + async def test_load_gcp_token_with_refresh(self): cred = SimpleNamespace( token=TEST_ANOTHER_DATA_BASE64, - expiry=datetime.datetime.now(), + expiry=datetime.datetime.now() ) loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp", get_google_credentials=lambda: cred) - self.assertTrue(loader._load_gcp_token()) + res = await loader.load_gcp_token() + self.assertTrue(res) + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, + loader.token) + + async def test_async_load_gcp_token_with_refresh(self): + + async def cred(): + return SimpleNamespace( + token=TEST_ANOTHER_DATA_BASE64, + expiry=datetime.datetime.now() + ) + + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="expired_gcp", + get_google_credentials=cred) + res = await loader.load_gcp_token() + self.assertTrue(res) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) - def test_user_pass(self): + async def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="user_pass").load_and_set(actual) self.assertEqual(expected, actual) @@ -554,16 +579,16 @@ def test_load_user_pass_token(self): self.assertTrue(loader._load_user_pass_token()) self.assertEqual(TEST_BASIC_TOKEN, loader.token) - def test_ssl_no_cert_files(self): + async def test_ssl_no_cert_files(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-no_file") - self.expect_exception( + await self.async_expect_exception( loader.load_and_set, "does not exists", FakeConfig()) - def test_ssl(self): + async def test_ssl(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, @@ -572,12 +597,12 @@ def test_ssl(self): ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH) ) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl").load_and_set(actual) self.assertEqual(expected, actual) - def test_ssl_no_verification(self): + async def test_ssl_no_verification(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, @@ -587,7 +612,7 @@ def test_ssl_no_verification(self): ssl_ca_cert=None, ) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_ssl_verification").load_and_set(actual) self.assertEqual(expected, actual) @@ -615,7 +640,7 @@ def test_set_active_context(self): self.assertEqual(expected_contexts.get_with_name("ssl").value, loader.current_context) - def test_ssl_with_relative_ssl_files(self): + async def test_ssl_with_relative_ssl_files(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, @@ -634,7 +659,7 @@ def test_ssl_with_relative_ssl_files(self): fd.write(TEST_CLIENT_KEY.encode()) with open(os.path.join(temp_dir, "token_file"), "wb") as fd: fd.write(TEST_DATA_BASE64.encode()) - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-local-file", config_base_path=temp_dir).load_and_set(actual) @@ -642,13 +667,14 @@ def test_ssl_with_relative_ssl_files(self): finally: shutil.rmtree(temp_dir) - def test_load_kube_config(self): + async def test_load_kube_config(self): expected = FakeConfig(host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG)) actual = FakeConfig() - load_kube_config(config_file=config_file, context="simple_token", - client_configuration=actual) + await load_kube_config(config_file=config_file, + context="simple_token", + client_configuration=actual) self.assertEqual(expected, actual) def test_list_kube_config_contexts(self): @@ -664,32 +690,50 @@ def test_list_kube_config_contexts(self): self.assertItemsEqual(self.TEST_KUBE_CONFIG['contexts'], contexts) - def test_new_client_from_config(self): + async def test_new_client_from_config(self): config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG)) - client = new_client_from_config( + client = await new_client_from_config( config_file=config_file, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, client.configuration.api_key['authorization']) - def test_no_users_section(self): + async def test_no_users_section(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() test_kube_config = self.TEST_KUBE_CONFIG.copy() del test_kube_config['users'] - KubeConfigLoader( + await KubeConfigLoader( config_dict=test_kube_config, active_context="gcp").load_and_set(actual) self.assertEqual(expected, actual) - def test_non_existing_user(self): + async def test_non_existing_user(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() - KubeConfigLoader( + await KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="non_existing_user").load_and_set(actual) self.assertEqual(expected, actual) + async def test_refresh_token(self): + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="gcp", + get_google_credentials=lambda: _raise_exception( + "SHOULD NOT BE CALLED")) + mock_sleep = patch('asyncio.sleep').start() + mock_sleep.side_effect = [0, AssertionError] + + mock_config = Mock() + mock_config.api_key = {} + + with self.assertRaises(AssertionError): + await refresh_token(loader, mock_config) + + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, + loader.token) + if __name__ == '__main__': - unittest.main() + main() diff --git a/kubernetes_asyncio/e2e_test/base.py b/kubernetes_asyncio/e2e_test/base.py index e9b41074d..e8f1bf960 100644 --- a/kubernetes_asyncio/e2e_test/base.py +++ b/kubernetes_asyncio/e2e_test/base.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import asyncio import http.client import os import unittest @@ -25,7 +26,8 @@ def get_e2e_configuration(): config.host = None if os.path.exists( os.path.expanduser(kube_config.KUBE_CONFIG_DEFAULT_LOCATION)): - kube_config.load_kube_config(client_configuration=config) + loop = asyncio.get_event_loop() + loop.run_until_complete(kube_config.load_kube_config(client_configuration=config)) else: print('Unable to load config from %s' % kube_config.KUBE_CONFIG_DEFAULT_LOCATION) diff --git a/requirements.txt b/requirements.txt index bed56e7cd..25afef7c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,4 @@ python-dateutil>=2.5.3 # BSD setuptools>=21.0.0 # PSF/ZPL urllib3>=1.19.1,!=1.21,<1.23 # MIT pyyaml>=3.12 # MIT -google-auth>=1.0.1 # Apache-2.0 -requests<=2.18.4 # Apache-2.0 -requests-oauthlib # ISC aiohttp>=2.3.10 # # Apache-2.0