Skip to content

Commit

Permalink
xDS interop: Refresh k8s auth on 401 to pick up updated auth token (#…
Browse files Browse the repository at this point in the history
…32210)

This PR adds retries on create/get requests from the test driver to the K8s API when 401 Unauthorized error is encountered.
K8S python library expects the ApiClient to be cycled on auth token refreshes.

The problem is described in kubernetes-client/python#741. Currently we don't have any hypotheses why we weren't affected by this problem before.

To force the ApiClient to pick up the new credentials, I shut down the current client, create a new one, and replace api_client properties on all k8s APIs we manage.

This should also work with the Watch-based log collector recovering from an error. To support that, I replace default Configuration so that the next time Watch creates ApiClient implicitly, the Configuration with updated token will be used.
  • Loading branch information
sergiitk authored Jan 27, 2023
1 parent cf9d1c6 commit c0a4f62
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
62 changes: 46 additions & 16 deletions framework/infrastructure/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# added to get around circular dependencies caused by k8s.py clashing with
# k8s/__init__.py
import datetime
import functools
import json
import logging
import pathlib
Expand All @@ -33,51 +32,78 @@
from framework.infrastructure.k8s_internal import k8s_port_forwarder

logger = logging.getLogger(__name__)

# Type aliases
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
PodLogCollector = k8s_log_collector.PodLogCollector
PortForwarder = k8s_port_forwarder.PortForwarder
ApiClient = client.ApiClient
V1Deployment = client.V1Deployment
V1ServiceAccount = client.V1ServiceAccount
V1Pod = client.V1Pod
V1PodList = client.V1PodList
V1Service = client.V1Service
V1Namespace = client.V1Namespace
ApiException = client.ApiException
FailToCreateError = utils.FailToCreateError


def simple_resource_get(func):
def _simple_resource_get(func):

def wrap_not_found_return_none(*args, **kwargs):
def _wrap_simple_resource_get(self: 'KubernetesNamespace', *args, **kwargs):
try:
return func(*args, **kwargs)
except client.ApiException as e:
return func(self, *args, **kwargs)
except ApiException as e:
if e.status == 404:
# Ignore 404
# Instead of trowing an error when a resource doesn't exist,
# just return None.
return None
elif e.status == 401:
# 401 Unauthorized: token might be expired, attempt auth refresh
self.refresh_auth()
return func(self, *args, **kwargs)
# Reraise for anything else.
raise

return wrap_not_found_return_none
return _wrap_simple_resource_get


def label_dict_to_selector(labels: dict) -> str:
return ','.join(f'{k}=={v}' for k, v in labels.items())


class KubernetesApiManager:
_client: ApiClient
context: str
apps: client.AppsV1Api
core: client.CoreV1Api
_apis: set

def __init__(self, context):
def __init__(self, context: str):
self.context = context
self.client = self._cached_api_client_for_context(context)
self._client = self._new_client_from_context(context)
self.apps = client.AppsV1Api(self.client)
self.core = client.CoreV1Api(self.client)
self._apis = {self.apps, self.core}

@property
def client(self) -> ApiClient:
return self._client

def close(self):
self.client.close()

@classmethod
@functools.lru_cache(None)
def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
def reload(self):
self.close()
self._client = self._new_client_from_context(self.context)
# Update default configuration so that modules that initialize
# ApiClient implicitly (e.g. kubernetes.watch.Watch) get the updates.
client.Configuration.set_default(self._client.configuration)
for api in self._apis:
api.api_client = self._client

@staticmethod
def _new_client_from_context(context: str) -> ApiClient:
client_instance = kubernetes.config.new_client_from_config(
context=context)
logger.info('Using kubernetes context "%s", active host: %s', context,
Expand All @@ -101,16 +127,20 @@ def __init__(self, api: KubernetesApiManager, name: str):
self.name = name
self.api = api

def refresh_auth(self):
logger.info('Reloading k8s api client to refresh the auth.')
self.api.reload()

def apply_manifest(self, manifest):
return utils.create_from_dict(self.api.client,
manifest,
namespace=self.name)

@simple_resource_get
@_simple_resource_get
def get_service(self, name) -> V1Service:
return self.api.core.read_namespaced_service(name, self.name)

@simple_resource_get
@_simple_resource_get
def get_service_account(self, name) -> V1Service:
return self.api.core.read_namespaced_service_account(name, self.name)

Expand All @@ -134,7 +164,7 @@ def delete_service_account(self,
propagation_policy='Foreground',
grace_period_seconds=grace_period_seconds))

@simple_resource_get
@_simple_resource_get
def get(self) -> V1Namespace:
return self.api.core.read_namespace(self.name)

Expand Down Expand Up @@ -202,7 +232,7 @@ def get_service_neg(self, service_name: str,
neg_zones: List[str] = neg_info['zones']
return neg_name, neg_zones

@simple_resource_get
@_simple_resource_get
def get_deployment(self, name) -> V1Deployment:
return self.api.apps.read_namespaced_deployment(name, self.name)

Expand Down
23 changes: 21 additions & 2 deletions framework/test_app/runners/k8s/k8s_base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _template_file_from_name(cls, template_name):
cls.TEMPLATE_DIR_RELATIVE_PATH)
return templates_path.joinpath(template_name).resolve()

def _create_from_template(self, template_name, **kwargs):
def _create_from_template(self, template_name, **kwargs) -> object:
template_file = self._template_file_from_name(template_name)
logger.debug("Loading k8s manifest template: %s", template_file)

Expand All @@ -135,7 +135,26 @@ def _create_from_template(self, template_name, **kwargs):
if next(manifests, False):
raise _RunnerError('Exactly one document expected in manifest '
f'{template_file}')
k8s_objects = self.k8s_namespace.apply_manifest(manifest)

# TODO(sergiitk, b/178378578): add a retryer.
try:
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
except k8s.FailToCreateError as err_create:
# Since we verified this is not a multi-doc yaml, we should
# expect a single exception. Otherwise, something went horribly
# wrong, or API promises got broken.
if len(err_create.api_exceptions) != 1:
raise

api_exception: k8s.ApiException = err_create.api_exceptions[0]
if api_exception.status == 401:
# 401 Unauthorized: token might be expired, attempt auth refresh
self.k8s_namespace.refresh_auth()
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
else:
# Reraise for anything else.
raise

if len(k8s_objects) != 1:
raise _RunnerError('Expected exactly one object must created from '
f'manifest {template_file}')
Expand Down

0 comments on commit c0a4f62

Please sign in to comment.