Skip to content

Commit

Permalink
Merge pull request #26 from tomplus/feat/async_gcp_token
Browse files Browse the repository at this point in the history
feat: async refresh token, remove synchronous libs
  • Loading branch information
tomplus authored Jun 30, 2018
2 parents 3444267 + 5320d79 commit 1c15f8e
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 102 deletions.
2 changes: 1 addition & 1 deletion examples/example1.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
await config.load_kube_config()

v1 = client.CoreV1Api()
print("Listing pods with their IPs:")
Expand Down
2 changes: 1 addition & 1 deletion examples/example2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
await config.load_kube_config()

v1 = client.CoreV1Api()
count = 10
Expand Down
2 changes: 1 addition & 1 deletion examples/example3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
await config.load_kube_config()

v1 = client.CoreV1Api()

Expand Down
9 changes: 5 additions & 4 deletions examples/example4.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ async def watch_namespaces():
v1 = client.CoreV1Api()
async for event in watch.Watch().stream(v1.list_namespace):
etype, obj = event['type'], event['object']
print(f"{etype} namespace {obj.metadata.name}")
print("{} namespace {}".format(etype, obj.metadata.name))


async def watch_pods():
v1 = client.CoreV1Api()
async for event in watch.Watch().stream(v1.list_pod_for_all_namespaces):
evt, obj = event['type'], event['object']
print(f"{evt} pod {obj.metadata.name} in NS {obj.metadata.namespace}")
print("{} pod {} in NS {}".format(evt, obj.metadata.name, obj.metadata.namespace))


def main():
loop = asyncio.get_event_loop()

# Load the kubeconfig file specified in the KUBECONFIG environment
# variable, or fall back to `~/.kube/config`.
config.load_kube_config()
loop.run_until_complete(config.load_kube_config())

# Define the tasks to watch namespaces and pods.
tasks = [
Expand All @@ -30,7 +32,6 @@ def main():
]

# Push tasks into event loop.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Expand Down
6 changes: 5 additions & 1 deletion examples/tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def print_pod_log(pod, namespace, container, lines, follow):
async def main():
args = parse_args()

config.load_kube_config()
loader = await config.load_kube_config()

v1 = client.CoreV1Api()
ret = await v1.list_namespaced_pod(args.namespace)
Expand All @@ -69,6 +69,10 @@ async def main():
print('No matching PODs !')
return

if args.follow:
# autorefresh gcp token
cmd.append(config.refresh_token(loader))

await asyncio.wait(cmd)


Expand Down
2 changes: 1 addition & 1 deletion kubernetes_asyncio/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
from .config_exception import ConfigException
from .incluster_config import load_incluster_config
from .kube_config import (list_kube_config_contexts, load_kube_config,
new_client_from_config)
new_client_from_config, refresh_token)
27 changes: 27 additions & 0 deletions kubernetes_asyncio/config/google_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio.subprocess
import json
import shlex
from types import SimpleNamespace


async def google_auth_credentials(provider):

if 'cmd-path' not in provider or 'cmd-args' not in provider:
raise ValueError('GoogleAuth via gcloud is supported! Values for cmd-path, cmd-args are required.')

cmd = shlex.split(' '.join((provider['cmd-path'], provider['cmd-args'])))
cmd_exec = asyncio.create_subprocess_exec(*cmd,
stdin=None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
proc = await cmd_exec

data = await proc.stdout.read()
data = data.decode('ascii').rstrip()
data = json.loads(data)

await proc.wait()
return SimpleNamespace(
token=data['credential']['access_token'],
expiry=data['credential']['token_expiry']
)
42 changes: 42 additions & 0 deletions kubernetes_asyncio/config/google_auth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asynctest import TestCase, main

from .google_auth import google_auth_credentials


class TestGoogleAuth(TestCase):

async def test_google_auth_credentials(self):

provider = {
'cmd-path': '/bin/echo',
'cmd-args': '{\\"credential\\": {\\"access_token\\": \\"token\\", '
'\\"token_expiry\\": \\"2001.01.01T00:00:00Z\\"}}'
}

ret = await google_auth_credentials(provider)

self.assertEqual(ret.token, 'token')
self.assertEqual(ret.expiry, '2001.01.01T00:00:00Z')

async def test_google_auth_credentials_exception(self):

with self.assertRaisesRegex(ValueError, "cmd-path, cmd-args are required."):
await google_auth_credentials({})


if __name__ == '__main__':
main()
127 changes: 72 additions & 55 deletions kubernetes_asyncio/config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import atexit
import base64
import datetime
import os
import tempfile

import google.auth
import google.auth.transport.requests
import urllib3
import yaml

from kubernetes_asyncio.client import ApiClient, Configuration

from .config_exception import ConfigException
from .dateutil import UTC, format_rfc3339, parse_rfc3339
from .dateutil import UTC, parse_rfc3339
from .google_auth import google_auth_credentials

EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5)
KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config')
Expand Down Expand Up @@ -59,7 +59,7 @@ def _create_temp_file_with_content(content):


def _is_expired(expiry):
return ((parse_rfc3339(expiry) + EXPIRY_SKEW_PREVENTION_DELAY) <=
return ((parse_rfc3339(expiry) - EXPIRY_SKEW_PREVENTION_DELAY) <=
datetime.datetime.utcnow().replace(tzinfo=UTC))


Expand Down Expand Up @@ -124,22 +124,14 @@ def __init__(self, config_dict, active_context=None,
self._current_context = None
self._user = None
self._cluster = None
self.provider = None
self.set_active_context(active_context)
self._config_base_path = config_base_path
self._config_persister = config_persister

def _refresh_credentials():
credentials, project_id = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
request = google.auth.transport.requests.Request()
credentials.refresh(request)
return credentials

if get_google_credentials:
self._get_google_credentials = get_google_credentials
else:
self._get_google_credentials = _refresh_credentials
self._get_google_credentials = None

def set_active_context(self, context_name=None):
if context_name is None:
Expand All @@ -158,8 +150,10 @@ def set_active_context(self, context_name=None):
self._user = None
self._cluster = self._config['clusters'].get_with_name(
self._current_context['context']['cluster'])['cluster']
if self._user is not None and 'auth-provider' in self._user and 'name' in self._user['auth-provider']:
self.provider = self._user['auth-provider']['name']

def _load_authentication(self):
async def _load_authentication(self):
"""Read authentication from kube-config user section if exists.
This function goes through various authentication methods in user
Expand All @@ -173,40 +167,39 @@ def _load_authentication(self):
"""
if not self._user:
return
if self._load_gcp_token():

if self.provider == 'gcp':
await self.load_gcp_token()
return

if self._load_user_token():
return
self._load_user_pass_token()

def _load_gcp_token(self):
if 'auth-provider' not in self._user:
return
provider = self._user['auth-provider']
if 'name' not in provider:
return
if provider['name'] != 'gcp':
return

if (('config' not in provider) or
('access-token' not in provider['config']) or
('expiry' in provider['config'] and
_is_expired(provider['config']['expiry']))):
# token is not available or expired, refresh it
self._refresh_gcp_token()
async def load_gcp_token(self):

self.token = "Bearer %s" % provider['config']['access-token']
return self.token

def _refresh_gcp_token(self):
if 'config' not in self._user['auth-provider']:
self._user['auth-provider'].value['config'] = {}
provider = self._user['auth-provider']['config']
credentials = self._get_google_credentials()
provider.value['access-token'] = credentials.token
provider.value['expiry'] = format_rfc3339(credentials.expiry)
if self._config_persister:
self._config_persister(self._config.value)

config = self._user['auth-provider']['config']

if (('access-token' not in config) or
('expiry' in config and _is_expired(config['expiry']))):

if self._get_google_credentials is not None:
if asyncio.iscoroutinefunction(self._get_google_credentials):
credentials = await self._get_google_credentials()
else:
credentials = self._get_google_credentials()
else:
credentials = await google_auth_credentials(config)
config.value['access-token'] = credentials.token
config.value['expiry'] = credentials.expiry
if self._config_persister:
self._config_persister(self._config.value)

self.token = "Bearer %s" % config['access-token']
return self.token

def _load_user_token(self):
token = FileOrData(
Expand Down Expand Up @@ -241,16 +234,18 @@ def _load_cluster_info(self):
self.verify_ssl = not self._cluster['insecure-skip-tls-verify']

def _set_config(self, client_configuration):

if 'token' in self.__dict__:
client_configuration.api_key['authorization'] = self.token

# copy these keys directly from self to configuration object
keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
for key in keys:
if key in self.__dict__:
setattr(client_configuration, key, getattr(self, key))

def load_and_set(self, client_configuration):
self._load_authentication()
async def load_and_set(self, client_configuration):
await self._load_authentication()
self._load_cluster_info()
self._set_config(client_configuration)

Expand Down Expand Up @@ -339,9 +334,9 @@ def list_kube_config_contexts(config_file=None):
return loader.list_contexts(), loader.current_context


def load_kube_config(config_file=None, context=None,
client_configuration=None,
persist_config=True):
async def load_kube_config(config_file=None, context=None,
client_configuration=None,
persist_config=True):
"""Loads authentication and cluster information from kube-config file
and stores them in kubernetes.client.configuration.
Expand Down Expand Up @@ -369,21 +364,43 @@ def _save_kube_config(config_map):
config_persister=config_persister)
if client_configuration is None:
config = type.__call__(Configuration)
loader.load_and_set(config)
await loader.load_and_set(config)
Configuration.set_default(config)
else:
loader.load_and_set(client_configuration)
await loader.load_and_set(client_configuration)

return loader


def new_client_from_config(
config_file=None,
context=None,
persist_config=True):
async def refresh_token(loader, client_configuration=None, interval=60):
"""Refresh token if necessary, updates the token in client configurarion
:param loader: KubeConfigLoader returned by load_kube_config
:param client_configuration: The kubernetes.client.Configuration to
set configs to.
:param interval: how often check if token is up-to-date
"""
if loader.provider != 'gcp':
return

if client_configuration is None:
client_configuration = Configuration()

while 1:
await asyncio.sleep(interval)
await loader.load_gcp_token()
client_configuration.api_key['authorization'] = loader.token


async def new_client_from_config(config_file=None, context=None, persist_config=True):
"""Loads configuration the same as load_kube_config but returns an ApiClient
to be used with any API object. This will allow the caller to concurrently
talk with multiple clusters."""
client_config = type.__call__(Configuration)
load_kube_config(config_file=config_file, context=context,
client_configuration=client_config,
persist_config=persist_config)

await load_kube_config(config_file=config_file, context=context,
client_configuration=client_config,
persist_config=persist_config)

return ApiClient(configuration=client_config)
Loading

0 comments on commit 1c15f8e

Please sign in to comment.