From c0a4f62ff68bcda81d6d6c6d41dbdac05853136d Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 26 Jan 2023 20:42:59 -0500 Subject: [PATCH] xDS interop: Refresh k8s auth on 401 to pick up updated auth token (#32210) This PR adds retries on create/get requests from the test driver to the K8s API when 401 Unauthorized error is encountered. K8S python library expects the ApiClient to be cycled on auth token refreshes. The problem is described in kubernetes-client/python#741. Currently we don't have any hypotheses why we weren't affected by this problem before. To force the ApiClient to pick up the new credentials, I shut down the current client, create a new one, and replace api_client properties on all k8s APIs we manage. This should also work with the Watch-based log collector recovering from an error. To support that, I replace default Configuration so that the next time Watch creates ApiClient implicitly, the Configuration with updated token will be used. --- framework/infrastructure/k8s.py | 62 ++++++++++++++----- .../test_app/runners/k8s/k8s_base_runner.py | 23 ++++++- 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/framework/infrastructure/k8s.py b/framework/infrastructure/k8s.py index 869346e1..731ded34 100644 --- a/framework/infrastructure/k8s.py +++ b/framework/infrastructure/k8s.py @@ -15,7 +15,6 @@ # added to get around circular dependencies caused by k8s.py clashing with # k8s/__init__.py import datetime -import functools import json import logging import pathlib @@ -33,10 +32,12 @@ from framework.infrastructure.k8s_internal import k8s_port_forwarder logger = logging.getLogger(__name__) + # Type aliases _HighlighterYaml = framework.helpers.highlighter.HighlighterYaml PodLogCollector = k8s_log_collector.PodLogCollector PortForwarder = k8s_port_forwarder.PortForwarder +ApiClient = client.ApiClient V1Deployment = client.V1Deployment V1ServiceAccount = client.V1ServiceAccount V1Pod = client.V1Pod @@ -44,20 +45,27 @@ V1Service = client.V1Service V1Namespace = client.V1Namespace ApiException = client.ApiException +FailToCreateError = utils.FailToCreateError -def simple_resource_get(func): +def _simple_resource_get(func): - def wrap_not_found_return_none(*args, **kwargs): + def _wrap_simple_resource_get(self: 'KubernetesNamespace', *args, **kwargs): try: - return func(*args, **kwargs) - except client.ApiException as e: + return func(self, *args, **kwargs) + except ApiException as e: if e.status == 404: - # Ignore 404 + # Instead of trowing an error when a resource doesn't exist, + # just return None. return None + elif e.status == 401: + # 401 Unauthorized: token might be expired, attempt auth refresh + self.refresh_auth() + return func(self, *args, **kwargs) + # Reraise for anything else. raise - return wrap_not_found_return_none + return _wrap_simple_resource_get def label_dict_to_selector(labels: dict) -> str: @@ -65,19 +73,37 @@ def label_dict_to_selector(labels: dict) -> str: class KubernetesApiManager: + _client: ApiClient + context: str + apps: client.AppsV1Api + core: client.CoreV1Api + _apis: set - def __init__(self, context): + def __init__(self, context: str): self.context = context - self.client = self._cached_api_client_for_context(context) + self._client = self._new_client_from_context(context) self.apps = client.AppsV1Api(self.client) self.core = client.CoreV1Api(self.client) + self._apis = {self.apps, self.core} + + @property + def client(self) -> ApiClient: + return self._client def close(self): self.client.close() - @classmethod - @functools.lru_cache(None) - def _cached_api_client_for_context(cls, context: str) -> client.ApiClient: + def reload(self): + self.close() + self._client = self._new_client_from_context(self.context) + # Update default configuration so that modules that initialize + # ApiClient implicitly (e.g. kubernetes.watch.Watch) get the updates. + client.Configuration.set_default(self._client.configuration) + for api in self._apis: + api.api_client = self._client + + @staticmethod + def _new_client_from_context(context: str) -> ApiClient: client_instance = kubernetes.config.new_client_from_config( context=context) logger.info('Using kubernetes context "%s", active host: %s', context, @@ -101,16 +127,20 @@ def __init__(self, api: KubernetesApiManager, name: str): self.name = name self.api = api + def refresh_auth(self): + logger.info('Reloading k8s api client to refresh the auth.') + self.api.reload() + def apply_manifest(self, manifest): return utils.create_from_dict(self.api.client, manifest, namespace=self.name) - @simple_resource_get + @_simple_resource_get def get_service(self, name) -> V1Service: return self.api.core.read_namespaced_service(name, self.name) - @simple_resource_get + @_simple_resource_get def get_service_account(self, name) -> V1Service: return self.api.core.read_namespaced_service_account(name, self.name) @@ -134,7 +164,7 @@ def delete_service_account(self, propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) - @simple_resource_get + @_simple_resource_get def get(self) -> V1Namespace: return self.api.core.read_namespace(self.name) @@ -202,7 +232,7 @@ def get_service_neg(self, service_name: str, neg_zones: List[str] = neg_info['zones'] return neg_name, neg_zones - @simple_resource_get + @_simple_resource_get def get_deployment(self, name) -> V1Deployment: return self.api.apps.read_namespaced_deployment(name, self.name) diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index a7c42797..7ef15209 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -121,7 +121,7 @@ def _template_file_from_name(cls, template_name): cls.TEMPLATE_DIR_RELATIVE_PATH) return templates_path.joinpath(template_name).resolve() - def _create_from_template(self, template_name, **kwargs): + def _create_from_template(self, template_name, **kwargs) -> object: template_file = self._template_file_from_name(template_name) logger.debug("Loading k8s manifest template: %s", template_file) @@ -135,7 +135,26 @@ def _create_from_template(self, template_name, **kwargs): if next(manifests, False): raise _RunnerError('Exactly one document expected in manifest ' f'{template_file}') - k8s_objects = self.k8s_namespace.apply_manifest(manifest) + + # TODO(sergiitk, b/178378578): add a retryer. + try: + k8s_objects = self.k8s_namespace.apply_manifest(manifest) + except k8s.FailToCreateError as err_create: + # Since we verified this is not a multi-doc yaml, we should + # expect a single exception. Otherwise, something went horribly + # wrong, or API promises got broken. + if len(err_create.api_exceptions) != 1: + raise + + api_exception: k8s.ApiException = err_create.api_exceptions[0] + if api_exception.status == 401: + # 401 Unauthorized: token might be expired, attempt auth refresh + self.k8s_namespace.refresh_auth() + k8s_objects = self.k8s_namespace.apply_manifest(manifest) + else: + # Reraise for anything else. + raise + if len(k8s_objects) != 1: raise _RunnerError('Expected exactly one object must created from ' f'manifest {template_file}')