Skip to content

Commit

Permalink
Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
Browse files Browse the repository at this point in the history
A workaround was added (apache/airflow#5731) to handle the refreshing of EKS tokens.  It was necessary because of an upstream bug.  It has since been fixed (kubernetes-client/python-base@70b78cd) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170).

GitOrigin-RevId: 7bd165fbe2cbbfa8208803ec352c5d16ca2bd3ec
  • Loading branch information
dstandish authored and Cloud Composer Team committed Jan 27, 2023
1 parent 1cb7e94 commit 182aecf
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 287 deletions.
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ https://developers.google.com/style/inclusive-documentation
-->

### Minimum kubernetes version bumped from 3.0.0 to 21.7.0

No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).

### Deprecation: `Connection.extra` must be JSON-encoded dict

#### TLDR
Expand Down
47 changes: 10 additions & 37 deletions airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,10 @@
try:
from kubernetes import client, config
from kubernetes.client import Configuration
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config

has_kubernetes = True

def _get_kube_config(
in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
) -> Optional[Configuration]:
if in_cluster:
# load_incluster_config set default configuration with config populated by k8s
config.load_incluster_config()
return None
else:
# this block can be replaced with just config.load_kube_config once
# refresh_config module is replaced with upstream fix
cfg = RefreshConfiguration()
load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
return cfg

def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
"""
This is a workaround for supporting api token refresh in k8s client.
The function can be replace with `return client.CoreV1Api()` once the
upstream client supports token refresh.
"""
if cfg:
return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
else:
return client.CoreV1Api()

def _disable_verify_ssl() -> None:
configuration = Configuration()
configuration.verify_ssl = False
Expand Down Expand Up @@ -126,17 +97,19 @@ def get_kube_client(
if not has_kubernetes:
raise _import_err

if not in_cluster:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)

if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
_enable_tcp_keepalive()

if not conf.getboolean('kubernetes', 'verify_ssl'):
_disable_verify_ssl()

client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
return _get_client_with_patched_configuration(client_conf)
if in_cluster:
config.load_incluster_config()
else:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)
config.load_kube_config(config_file=config_file, context=cluster_context)

return client.CoreV1Api()
124 changes: 0 additions & 124 deletions airflow/kubernetes/refresh_config.py

This file was deleted.

8 changes: 2 additions & 6 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
try:
# Kube >= 19
from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
except ImportError:
from kubernetes.client.models.v1_event_list import V1EventList
from kubernetes.client.models.core_v1_event_list import CoreV1EventList


class PodLaunchFailedException(AirflowException):
Expand Down Expand Up @@ -298,7 +294,7 @@ def read_pod_logs(
raise

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> "V1EventList":
def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList":
"""Reads events from the POD"""
try:
return self._client.list_namespaced_event(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
]
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0',
'kubernetes>=21.7.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
Expand Down
22 changes: 9 additions & 13 deletions tests/kubernetes/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,21 @@
from kubernetes.client import Configuration
from urllib3.connection import HTTPConnection, HTTPSConnection

from airflow.kubernetes.kube_client import (
RefreshConfiguration,
_disable_verify_ssl,
_enable_tcp_keepalive,
get_kube_client,
)
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client


class TestClient(unittest.TestCase):
@mock.patch('airflow.kubernetes.kube_client.config')
def test_load_cluster_config(self, _):
client = get_kube_client(in_cluster=True)
assert not isinstance(client.api_client.configuration, RefreshConfiguration)
def test_load_cluster_config(self, config):
get_kube_client(in_cluster=True)
assert config.load_incluster_config.called
assert config.load_kube_config.not_called

@mock.patch('airflow.kubernetes.kube_client.config')
@mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
def test_load_file_config(self, _, _2):
client = get_kube_client(in_cluster=False)
assert isinstance(client.api_client.configuration, RefreshConfiguration)
def test_load_file_config(self, config):
get_kube_client(in_cluster=False)
assert config.load_incluster_config.not_called
assert config.load_kube_config.called

def test_enable_tcp_keepalive(self):
socket_options = [
Expand Down
106 changes: 0 additions & 106 deletions tests/kubernetes/test_refresh_config.py

This file was deleted.

0 comments on commit 182aecf

Please sign in to comment.