diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index 52d68f1324d4f..11d11ab3a01af 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -15,12 +15,49 @@ # specific language governing permissions and limitations # under the License. """Client for kubernetes communication""" + +from typing import Optional + from airflow.configuration import conf try: from kubernetes import config, client from kubernetes.client.rest import ApiException # pylint: disable=unused-import + from kubernetes.client.api_client import ApiClient + from kubernetes.client import Configuration + from airflow.kubernetes.refresh_config import ( # pylint: disable=ungrouped-imports + load_kube_config, + RefreshConfiguration, + ) has_kubernetes = True + + def _get_kube_config(in_cluster: bool, + cluster_context: Optional[str], + config_file: Optional[str]) -> Optional[Configuration]: + if in_cluster: + # load_incluster_config set default configuration with config populated by k8s + config.load_incluster_config() + return None + else: + # this block can be replaced with just config.load_kube_config once + # refresh_config module is replaced with upstream fix + cfg = RefreshConfiguration() + load_kube_config( + client_configuration=cfg, config_file=config_file, context=cluster_context) + return cfg + + def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api: + ''' + This is a workaround for supporting api token refresh in k8s client. + + The function can be replace with `return client.CoreV1Api()` once the + upstream client supports token refresh. + ''' + if cfg: + return client.CoreV1Api(api_client=ApiClient(configuration=cfg)) + else: + return client.CoreV1Api() + except ImportError as e: # We need an exception class to be able to use it in ``except`` elsewhere # in the code base @@ -29,19 +66,9 @@ _import_err = e -def _load_kube_config(in_cluster, cluster_context, config_file): - if not has_kubernetes: - raise _import_err - if in_cluster: - config.load_incluster_config() - else: - config.load_kube_config(config_file=config_file, context=cluster_context) - return client.CoreV1Api() - - -def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), - cluster_context=None, - config_file=None): +def get_kube_client(in_cluster: bool = conf.getboolean('kubernetes', 'in_cluster'), + cluster_context: Optional[str] = None, + config_file: Optional[str] = None): """ Retrieves Kubernetes client @@ -54,9 +81,15 @@ def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), :return kubernetes client :rtype client.CoreV1Api """ + + if not has_kubernetes: + raise _import_err + if not in_cluster: if cluster_context is None: cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) if config_file is None: config_file = conf.get('kubernetes', 'config_file', fallback=None) - return _load_kube_config(in_cluster, cluster_context, config_file) + + client_conf = _get_kube_config(in_cluster, cluster_context, config_file) + return _get_client_with_patched_configuration(client_conf) diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py new file mode 100644 index 0000000000000..b060d258ed19b --- /dev/null +++ b/airflow/kubernetes/refresh_config.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +NOTE: this module can be removed once upstream client supports token refresh +see: https://github.com/kubernetes-client/python/issues/741 +""" + +import calendar +import logging +import os +import time +from datetime import datetime + +import yaml +from kubernetes.client import Configuration +from kubernetes.config.exec_provider import ExecProvider +from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader + + +class RefreshKubeConfigLoader(KubeConfigLoader): + """ + Patched KubeConfigLoader, this subclass takes expirationTimestamp into + account and sets api key refresh callback hook in Configuration object + """ + def __init__(self, *args, **kwargs): + KubeConfigLoader.__init__(self, *args, **kwargs) + self.api_key_expire_ts = None + + def _load_from_exec_plugin(self): + """ + We override _load_from_exec_plugin method to also read and store + expiration timestamp for aws-iam-authenticator. It will be later + used for api token refresh. + """ + if 'exec' not in self._user: + return None + try: + status = ExecProvider(self._user['exec']).run() + if 'token' not in status: + logging.error('exec: missing token field in plugin output') + return None + self.token = "Bearer %s" % status['token'] # pylint: disable=W0201 + ts_str = status.get('expirationTimestamp') + if ts_str: + self.api_key_expire_ts = calendar.timegm( + datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z").timetuple(), + ) + return True + except Exception as e: # pylint: disable=W0703 + logging.error(str(e)) + + def refresh_api_key(self, client_configuration): + """ + Refresh API key if expired + """ + if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts: + self.load_and_set(client_configuration) + + def load_and_set(self, client_configuration): + KubeConfigLoader.load_and_set(self, client_configuration) + client_configuration.refresh_api_key = self.refresh_api_key + + +class RefreshConfiguration(Configuration): + """ + Patched Configuration, this subclass taskes api key refresh callback hook + into account + """ + def __init__(self, *args, **kwargs): + Configuration.__init__(self, *args, **kwargs) + self.refresh_api_key = None + + def get_api_key_with_prefix(self, identifier): + if self.refresh_api_key: + self.refresh_api_key(self) # pylint: disable=E1102 + return Configuration.get_api_key_with_prefix(self, identifier) + + +def _get_kube_config_loader_for_yaml_file(filename, **kwargs): + """ + Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed + KubeConfigLoader to RefreshKubeConfigLoader + """ + with open(filename) as f: + return RefreshKubeConfigLoader( + config_dict=yaml.safe_load(f), + config_base_path=os.path.abspath(os.path.dirname(filename)), + **kwargs) + + +def load_kube_config(client_configuration, config_file=None, context=None): + """ + Adapted from the upstream load_kube_config function, changes: + - removed persist_config argument since it's not being used + - remove `client_configuration is None` branch since we always pass + in client configuration + """ + if config_file is None: + config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) + + loader = _get_kube_config_loader_for_yaml_file( + config_file, active_context=context, config_persister=None) + + loader.load_and_set(client_configuration) diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py new file mode 100644 index 0000000000000..8e60ef49d542d --- /dev/null +++ b/tests/kubernetes/test_client.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow.kubernetes.kube_client import RefreshConfiguration, get_kube_client +from tests.compat import mock + + +class TestClient(unittest.TestCase): + + @mock.patch('airflow.kubernetes.kube_client.config') + def test_load_cluster_config(self, _): + client = get_kube_client(in_cluster=True) + assert not isinstance(client.api_client.configuration, RefreshConfiguration) + + @mock.patch('airflow.kubernetes.kube_client.config') + @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file') + def test_load_file_config(self, _, _2): + client = get_kube_client(in_cluster=False) + assert isinstance(client.api_client.configuration, RefreshConfiguration)