Skip to content

Commit

Permalink
Remove RefreshConfiguration workaround for K8s token refreshing
Browse files Browse the repository at this point in the history
A workaround was added (apache#5731) to handle the refreshing of EKS tokens.  It was necessary because of an upstream bug.  It has since been fixed (kubernetes-client/python-base@70b78cd) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170).
  • Loading branch information
dstandish committed Mar 16, 2022
1 parent df6058c commit 574c63e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 278 deletions.
47 changes: 10 additions & 37 deletions airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,10 @@
try:
from kubernetes import client, config
from kubernetes.client import Configuration
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config

has_kubernetes = True

def _get_kube_config(
in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
) -> Optional[Configuration]:
if in_cluster:
# load_incluster_config set default configuration with config populated by k8s
config.load_incluster_config()
return None
else:
# this block can be replaced with just config.load_kube_config once
# refresh_config module is replaced with upstream fix
cfg = RefreshConfiguration()
load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
return cfg

def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
"""
This is a workaround for supporting api token refresh in k8s client.
The function can be replace with `return client.CoreV1Api()` once the
upstream client supports token refresh.
"""
if cfg:
return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
else:
return client.CoreV1Api()

def _disable_verify_ssl() -> None:
configuration = Configuration()
configuration.verify_ssl = False
Expand Down Expand Up @@ -126,17 +97,19 @@ def get_kube_client(
if not has_kubernetes:
raise _import_err

if not in_cluster:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)

if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
_enable_tcp_keepalive()

if not conf.getboolean('kubernetes', 'verify_ssl'):
_disable_verify_ssl()

client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
return _get_client_with_patched_configuration(client_conf)
if in_cluster:
config.load_incluster_config()
else:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)
config.load_kube_config(config_file=config_file, context=cluster_context)

return client.CoreV1Api()
124 changes: 0 additions & 124 deletions airflow/kubernetes/refresh_config.py

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
]
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0',
'kubernetes>=21.7.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
Expand Down
14 changes: 4 additions & 10 deletions tests/kubernetes/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,19 @@
from kubernetes.client import Configuration
from urllib3.connection import HTTPConnection, HTTPSConnection

from airflow.kubernetes.kube_client import (
RefreshConfiguration,
_disable_verify_ssl,
_enable_tcp_keepalive,
get_kube_client,
)
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client


class TestClient(unittest.TestCase):
@mock.patch('airflow.kubernetes.kube_client.config')
def test_load_cluster_config(self, _):
client = get_kube_client(in_cluster=True)
assert not isinstance(client.api_client.configuration, RefreshConfiguration)
assert not isinstance(client.api_client.configuration, Configuration)

@mock.patch('airflow.kubernetes.kube_client.config')
@mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
def test_load_file_config(self, _, _2):
def test_load_file_config(self, _):
client = get_kube_client(in_cluster=False)
assert isinstance(client.api_client.configuration, RefreshConfiguration)
assert isinstance(client.api_client.configuration, Configuration)

def test_enable_tcp_keepalive(self):
socket_options = [
Expand Down
106 changes: 0 additions & 106 deletions tests/kubernetes/test_refresh_config.py

This file was deleted.

0 comments on commit 574c63e

Please sign in to comment.