diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index f38c4c8a..864969b2 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -25,7 +25,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: "3.9" - name: install build package run: | diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 87c74c57..eede51a7 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -38,41 +38,24 @@ jobs: # gain meaning on how job steps use them. # # k3s-channel: https://update.k3s.io/v1-release/channels - # k8s-python-client: https://github.com/kubernetes-client/python#compatibility + # kubernetes_asyncio: https://github.com/tomplus/kubernetes_asyncio/tags # include: - # Tests with various k8s-python-client versions - - python: 3.7 - k3s: v1.16 - k8s-python-client: 10.* # supports k8s 1.14 - - python: 3.8 - k3s: v1.16 - k8s-python-client: 11.* # supports k8s 1.15 - - python: 3.9 - k3s: v1.16 - k8s-python-client: 12.* # supports k8s 1.16 - - python: 3.9 + # Tests with oldest supported Python, k8s, and k8s client + - python: "3.7" k3s: v1.17 - k8s-python-client: 17.* # supports k8s 1.17 + test_dependencies: kubernetes_asyncio==19.* - # Test with pre-releases of k8s-python-client - - python: 3.9 - k3s: v1.18 - k8s-python-client: pre - - # Test with various recent k8s versions - - python: 3.8 + # Test with modern python and k8s versions + - python: "3.9" k3s: stable - k8s-python-client: 17.* - - python: 3.8 + - python: "3.9" k3s: latest - k8s-python-client: 17.* - # Test with JupyterHub in main branch - - python: 3.8 - k3s: v1.16 - k8s-python-client: 17.* - jupyterhub: main + # Test with latest python and JupyterHub in main branch + - python: "3.10" + k3s: latest + test_dependencies: git+https://github.com/jupyterhub/jupyterhub steps: - uses: actions/checkout@v2 @@ -81,31 +64,11 @@ jobs: python-version: "${{ matrix.python }}" - name: Install package and test dependencies - env: - K8S_PYTHON_CLIENT_VERSION: "${{ matrix.k8s-python-client }}" - JUPYTERHUB_VERSION: ${{ matrix.jupyterhub }} run: | - if [[ "$K8S_PYTHON_CLIENT_VERSION" == "pre" ]]; then - PRE="--pre" - PINS="" - else - PRE="" - PINS="kubernetes==${K8S_PYTHON_CLIENT_VERSION}" - fi - if [ "$JUPYTERHUB_VERSION" == "main" ]; then - PINS="$PINS git+https://github.com/jupyterhub/jupyterhub" - fi - - pip install --upgrade setuptools pip - pip install -e ".[test]" ${PRE} ${PINS} + pip install --upgrade pip + pip install -e ".[test]" ${{ matrix.test_dependencies }} pip freeze - # flake8 runs a very quick code analysis without running any of the code - # it analyses - - name: Run flake8 - run: | - flake8 kubespawner - # Starts a k8s cluster with NetworkPolicy enforcement and installs both # kubectl and helm. We won't need network policy enforcement or helm # though. @@ -141,7 +104,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: "3.9" - name: install build package run: | diff --git a/kubespawner/clients.py b/kubespawner/clients.py index cd0dc6a6..3ed378ab 100644 --- a/kubespawner/clients.py +++ b/kubespawner/clients.py @@ -1,20 +1,26 @@ -"""Shared clients for kubernetes +"""Configures and instantiates REST API clients of various kinds to +communicate with a Kubernetes api-server, but only one instance per kind is +instantiated. -avoids creating multiple kubernetes client objects, -each of which spawns an unused max-size thread pool +The instances of these REST API clients are also patched to avoid the creation +of unused threads. """ import weakref from unittest.mock import Mock -import kubernetes.client -from kubernetes.client import api_client +import kubernetes_asyncio.client +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client import Configuration -# FIXME: remove when instantiating a kubernetes client -# doesn't create N-CPUs threads unconditionally. -# monkeypatch threadpool in kubernetes api_client -# to avoid instantiating ThreadPools. -# This is known to work for kubernetes-4.0 -# and may need updating with later kubernetes clients +# FIXME: Remove this workaround when instantiating a k8s client doesn't +# automatically create a ThreadPool with 1 thread that we won't use +# anyhow. To know if that has happened, reading +# https://github.com/jupyterhub/kubespawner/issues/567 may be helpful. +# +# The workaround is to monkeypatch ThreadPool in the kubernetes +# api_client to avoid ThreadPools. This is known to work with both +# `kubernetes` and `kubernetes_asyncio`. +# _dummy_pool = Mock() api_client.ThreadPool = lambda *args, **kwargs: _dummy_pool @@ -22,7 +28,8 @@ def shared_client(ClientType, *args, **kwargs): - """Return a single shared kubernetes client instance + """Return a shared kubernetes client instance + based on the provided arguments. A weak reference to the instance is cached, so that concurrent calls to shared_client @@ -38,11 +45,38 @@ def shared_client(ClientType, *args, **kwargs): client = _client_cache[cache_key]() if client is None: - # Kubernetes client configuration is handled globally - # in kubernetes.py and is already called in spawner.py - # or proxy.py prior to a shared_client being instantiated - Client = getattr(kubernetes.client, ClientType) + # Kubernetes client configuration is handled globally and should already + # be configured from spawner.py or proxy.py via the load_config function + # prior to a shared_client being instantiated. + Client = getattr(kubernetes_asyncio.client, ClientType) client = Client(*args, **kwargs) # cache weakref so that clients can be garbage collected _client_cache[cache_key] = weakref.ref(client) + return client + + +async def load_config(caller): + """ + Loads global configuration for the Python client we use to communicate with + a Kubernetes API server, and optionally tweaks that configuration based on + specific settings on the passed caller object. + + This needs to be called before creating a kubernetes client, so practically + before the shared_client function is called. The caller must have both the + k8s_api_ssl_ca_cert and k8s_api_host attributes. KubeSpawner and + KubeIngressProxy both have these attributes. + """ + try: + kubernetes_asyncio.config.load_incluster_config() + except kubernetes_asyncio.config.ConfigException: + await kubernetes_asyncio.config.load_kube_config() + + if caller.k8s_api_ssl_ca_cert: + global_conf = Configuration.get_default_copy() + global_conf.ssl_ca_cert = caller.k8s_api_ssl_ca_cert + Configuration.set_default(global_conf) + if caller.k8s_api_host: + global_conf = Configuration.get_default_copy() + global_conf.host = caller.k8s_api_host + Configuration.set_default(global_conf) diff --git a/kubespawner/objects.py b/kubespawner/objects.py index 800f1c27..787d8afc 100644 --- a/kubespawner/objects.py +++ b/kubespawner/objects.py @@ -9,50 +9,49 @@ import re from urllib.parse import urlparse -from kubernetes.client.models import V1Affinity -from kubernetes.client.models import V1Container -from kubernetes.client.models import V1ContainerPort -from kubernetes.client.models import V1EndpointAddress -from kubernetes.client.models import V1Endpoints -from kubernetes.client.models import V1EndpointSubset -from kubernetes.client.models import V1EnvVar -from kubernetes.client.models import V1LabelSelector -from kubernetes.client.models import V1Lifecycle -from kubernetes.client.models import V1LocalObjectReference -from kubernetes.client.models import V1Namespace -from kubernetes.client.models import V1NodeAffinity -from kubernetes.client.models import V1NodeSelector -from kubernetes.client.models import V1NodeSelectorRequirement -from kubernetes.client.models import V1NodeSelectorTerm -from kubernetes.client.models import V1ObjectMeta -from kubernetes.client.models import V1OwnerReference -from kubernetes.client.models import V1PersistentVolumeClaim -from kubernetes.client.models import V1PersistentVolumeClaimSpec -from kubernetes.client.models import V1Pod -from kubernetes.client.models import V1PodAffinity -from kubernetes.client.models import V1PodAffinityTerm -from kubernetes.client.models import V1PodAntiAffinity -from kubernetes.client.models import V1PodSecurityContext -from kubernetes.client.models import V1PodSpec -from kubernetes.client.models import V1PreferredSchedulingTerm -from kubernetes.client.models import V1ResourceRequirements -from kubernetes.client.models import V1Secret -from kubernetes.client.models import V1SecurityContext -from kubernetes.client.models import V1Service -from kubernetes.client.models import V1ServicePort -from kubernetes.client.models import V1ServiceSpec -from kubernetes.client.models import V1Toleration -from kubernetes.client.models import V1Volume -from kubernetes.client.models import V1VolumeMount -from kubernetes.client.models import V1WeightedPodAffinityTerm +from kubernetes_asyncio.client.models import V1Affinity +from kubernetes_asyncio.client.models import V1Container +from kubernetes_asyncio.client.models import V1ContainerPort +from kubernetes_asyncio.client.models import V1EndpointAddress +from kubernetes_asyncio.client.models import V1Endpoints +from kubernetes_asyncio.client.models import V1EndpointSubset +from kubernetes_asyncio.client.models import V1EnvVar +from kubernetes_asyncio.client.models import V1LabelSelector +from kubernetes_asyncio.client.models import V1Lifecycle +from kubernetes_asyncio.client.models import V1LocalObjectReference +from kubernetes_asyncio.client.models import V1Namespace +from kubernetes_asyncio.client.models import V1NodeAffinity +from kubernetes_asyncio.client.models import V1NodeSelector +from kubernetes_asyncio.client.models import V1NodeSelectorRequirement +from kubernetes_asyncio.client.models import V1NodeSelectorTerm +from kubernetes_asyncio.client.models import V1ObjectMeta +from kubernetes_asyncio.client.models import V1OwnerReference +from kubernetes_asyncio.client.models import V1PersistentVolumeClaim +from kubernetes_asyncio.client.models import V1PersistentVolumeClaimSpec +from kubernetes_asyncio.client.models import V1Pod +from kubernetes_asyncio.client.models import V1PodAffinity +from kubernetes_asyncio.client.models import V1PodAffinityTerm +from kubernetes_asyncio.client.models import V1PodAntiAffinity +from kubernetes_asyncio.client.models import V1PodSecurityContext +from kubernetes_asyncio.client.models import V1PodSpec +from kubernetes_asyncio.client.models import V1PreferredSchedulingTerm +from kubernetes_asyncio.client.models import V1ResourceRequirements +from kubernetes_asyncio.client.models import V1Secret +from kubernetes_asyncio.client.models import V1SecurityContext +from kubernetes_asyncio.client.models import V1Service +from kubernetes_asyncio.client.models import V1ServicePort +from kubernetes_asyncio.client.models import V1ServiceSpec +from kubernetes_asyncio.client.models import V1Toleration +from kubernetes_asyncio.client.models import V1Volume +from kubernetes_asyncio.client.models import V1VolumeMount +from kubernetes_asyncio.client.models import V1WeightedPodAffinityTerm try: - from kubernetes.client.models import CoreV1EndpointPort + from kubernetes_asyncio.client.models import CoreV1EndpointPort except ImportError: - from kubernetes.client.models import V1EndpointPort as CoreV1EndpointPort + from kubernetes_asyncio.client.models import V1EndpointPort as CoreV1EndpointPort -from kubespawner.utils import get_k8s_model -from kubespawner.utils import update_k8s_model +from kubespawner.utils import get_k8s_model, update_k8s_model def make_pod( @@ -734,23 +733,33 @@ def make_ingress(name, routespec, target, labels, data): # to keep compatibility with older K8S versions try: - from kubernetes.client.models import ( - ExtensionsV1beta1Ingress, - ExtensionsV1beta1IngressSpec, - ExtensionsV1beta1IngressRule, - ExtensionsV1beta1HTTPIngressRuleValue, + from kubernetes_asyncio.client.models import ( ExtensionsV1beta1HTTPIngressPath, + ExtensionsV1beta1HTTPIngressRuleValue, + ExtensionsV1beta1Ingress, ExtensionsV1beta1IngressBackend, + ExtensionsV1beta1IngressRule, + ExtensionsV1beta1IngressSpec, ) except ImportError: - from kubernetes.client.models import ( - V1beta1Ingress as ExtensionsV1beta1Ingress, - V1beta1IngressSpec as ExtensionsV1beta1IngressSpec, - V1beta1IngressRule as ExtensionsV1beta1IngressRule, - V1beta1HTTPIngressRuleValue as ExtensionsV1beta1HTTPIngressRuleValue, + from kubernetes_asyncio.client.models import ( V1beta1HTTPIngressPath as ExtensionsV1beta1HTTPIngressPath, + ) + from kubernetes_asyncio.client.models import ( + V1beta1HTTPIngressRuleValue as ExtensionsV1beta1HTTPIngressRuleValue, + ) + from kubernetes_asyncio.client.models import ( + V1beta1Ingress as ExtensionsV1beta1Ingress, + ) + from kubernetes_asyncio.client.models import ( V1beta1IngressBackend as ExtensionsV1beta1IngressBackend, ) + from kubernetes_asyncio.client.models import ( + V1beta1IngressRule as ExtensionsV1beta1IngressRule, + ) + from kubernetes_asyncio.client.models import ( + V1beta1IngressSpec as ExtensionsV1beta1IngressSpec, + ) meta = V1ObjectMeta( name=name, diff --git a/kubespawner/proxy.py b/kubespawner/proxy.py index b52e8253..b019c412 100644 --- a/kubespawner/proxy.py +++ b/kubespawner/proxy.py @@ -1,17 +1,16 @@ +import asyncio +import functools import json import os import string -from concurrent.futures import ThreadPoolExecutor import escapism -import kubernetes.config from jupyterhub.proxy import Proxy from jupyterhub.utils import exponential_backoff -from kubernetes import client -from tornado import gen -from tornado.concurrent import run_on_executor +from kubernetes_asyncio import client from traitlets import Unicode +from .clients import load_config from .clients import shared_client from .objects import make_ingress from .reflector import ResourceReflector @@ -82,10 +81,10 @@ def _namespace_default(self): config=True, help=""" Location (absolute filepath) for CA certs of the k8s API server. - - Typically this is unnecessary, CA certs are picked up by + + Typically this is unnecessary, CA certs are picked up by config.load_incluster_config() or config.load_kube_config. - + In rare non-standard cases, such as using custom intermediate CA for your cluster, you may need to mount root CA's elsewhere in your Pod/Container and point this variable to that filepath @@ -97,8 +96,8 @@ def _namespace_default(self): config=True, help=""" Full host name of the k8s API server ("https://hostname:port"). - - Typically this is unnecessary, the hostname is picked up by + + Typically this is unnecessary, the hostname is picked up by config.load_incluster_config() or config.load_kube_config. """, ) @@ -106,14 +105,36 @@ def _namespace_default(self): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # We use the maximum number of concurrent user server starts (and thus proxy adds) - # as our threadpool maximum. This ensures that contention here does not become - # an accidental bottleneck. Since we serialize our create operations, we only - # need 1x concurrent_spawn_limit, not 3x. - self.executor = ThreadPoolExecutor(max_workers=self.app.concurrent_spawn_limit) + # Schedules async initialization logic that is to be awaited by async + # functions by decorating them with @_await_async_init. + self._async_init_future = asyncio.ensure_future(self._async_init()) - # Global configuration before reflector.py code runs - self._set_k8s_client_configuration() + async def _async_init(self): + """ + This method is scheduled to run from `__init__`, but not awaited there + as `__init__` can't be marked as async. + + Since JupyterHub won't await this method, we ensure the async methods + JupyterHub may call on this object will await this method before + continuing. To do this, we decorate them with `_await_async_init`. + + But, how do we figure out the methods to decorate? Likely only those + exposed by the base class that JupyterHub would know about. The base + class is Proxy, as declared in proxy.py: + https://github.com/jupyterhub/jupyterhub/blob/HEAD/jupyterhub/proxy.py. + + From the Proxy class docstring we can conclude that the following + methods, if implemented, could be what we need to decorate with + _await_async_init: + + - get_all_routes (implemented and decorated) + - add_route (implemented and decorated) + - delete_route (implemented and decorated) + - start + - stop + - get_route + """ + await load_config(caller=self) self.core_api = shared_client('CoreV1Api') self.extension_api = shared_client('ExtensionsV1beta1Api') @@ -128,32 +149,26 @@ def __init__(self, *args, **kwargs): parent=self, namespace=self.namespace, labels=labels ) self.endpoint_reflector = EndpointsReflector( - parent=self, namespace=self.namespace, labels=labels + self, namespace=self.namespace, labels=labels + ) + await asyncio.gather( + self.ingress_reflector.start(), + self.service_reflector.start(), + self.endpoint_reflector.start(), ) - def _set_k8s_client_configuration(self): - # The actual (singleton) Kubernetes client will be created - # in clients.py shared_client but the configuration - # for token / ca_cert / k8s api host is set globally - # in kubernetes.py syntax. It is being set here - # and this method called prior to shared_client - # for readability / coupling with traitlets values - try: - kubernetes.config.load_incluster_config() - except kubernetes.config.ConfigException: - kubernetes.config.load_kube_config() - if self.k8s_api_ssl_ca_cert: - global_conf = client.Configuration.get_default_copy() - global_conf.ssl_ca_cert = self.k8s_api_ssl_ca_cert - client.Configuration.set_default(global_conf) - if self.k8s_api_host: - global_conf = client.Configuration.get_default_copy() - global_conf.host = self.k8s_api_host - client.Configuration.set_default(global_conf) - - @run_on_executor - def asynchronize(self, method, *args, **kwargs): - return method(*args, **kwargs) + def _await_async_init(method): + """A decorator to await the _async_init method after having been + scheduled to run in the `__init__` method.""" + + @functools.wraps(method) + async def async_method(self, *args, **kwargs): + if self._async_init_future is not None: + await self._async_init_future + self._async_init_future = None + return await method(self, *args, **kwargs) + + return async_method def safe_name_for_routespec(self, routespec): safe_chars = set(string.ascii_lowercase + string.digits) @@ -173,10 +188,12 @@ async def delete_if_exists(self, kind, safe_name, future): raise self.log.warn("Could not delete %s/%s: does not exist", kind, safe_name) + @_await_async_init async def add_route(self, routespec, target, data): # Create a route with the name being escaped routespec # Use full routespec in label # 'data' is JSON encoded and put in an annotation - we don't need to query for it + safe_name = self.safe_name_for_routespec(routespec).lower() labels = { 'heritage': 'jupyterhub', @@ -189,9 +206,7 @@ async def add_route(self, routespec, target, data): async def ensure_object(create_func, patch_func, body, kind): try: - resp = await self.asynchronize( - create_func, namespace=self.namespace, body=body - ) + resp = await create_func(namespace=self.namespace, body=body) self.log.info('Created %s/%s', kind, safe_name) except client.rest.ApiException as e: if e.status == 409: @@ -199,8 +214,7 @@ async def ensure_object(create_func, patch_func, body, kind): self.log.warn( "Trying to patch %s/%s, it already exists", kind, safe_name ) - resp = await self.asynchronize( - patch_func, + resp = await patch_func( namespace=self.namespace, body=body, name=body.metadata.name, @@ -222,8 +236,7 @@ async def ensure_object(create_func, patch_func, body, kind): 'Could not find endpoints/%s after creating it' % safe_name, ) else: - delete_endpoint = self.asynchronize( - self.core_api.delete_namespaced_endpoints, + delete_endpoint = await self.core_api.delete_namespaced_endpoints( name=safe_name, namespace=self.namespace, body=client.V1DeleteOptions(grace_period_seconds=0), @@ -256,30 +269,29 @@ async def ensure_object(create_func, patch_func, body, kind): 'Could not find ingress/%s after creating it' % safe_name, ) + @_await_async_init async def delete_route(self, routespec): # We just ensure that these objects are deleted. # This means if some of them are already deleted, we just let it # be. + safe_name = self.safe_name_for_routespec(routespec).lower() delete_options = client.V1DeleteOptions(grace_period_seconds=0) - delete_endpoint = self.asynchronize( - self.core_api.delete_namespaced_endpoints, + delete_endpoint = await self.core_api.delete_namespaced_endpoints( name=safe_name, namespace=self.namespace, body=delete_options, ) - delete_service = self.asynchronize( - self.core_api.delete_namespaced_service, + delete_service = await self.core_api.delete_namespaced_service( name=safe_name, namespace=self.namespace, body=delete_options, ) - delete_ingress = self.asynchronize( - self.extension_api.delete_namespaced_ingress, + delete_ingress = await self.extension_api.delete_namespaced_ingress( name=safe_name, namespace=self.namespace, body=delete_options, @@ -293,10 +305,13 @@ async def delete_route(self, routespec): # explicitly ourselves as well. In the future, we can probably try a # foreground cascading deletion (https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#foreground-cascading-deletion) # instead, but for now this works well enough. - await self.delete_if_exists('endpoint', safe_name, delete_endpoint) - await self.delete_if_exists('service', safe_name, delete_service) - await self.delete_if_exists('ingress', safe_name, delete_ingress) + await asyncio.gather( + self.delete_if_exists('endpoint', safe_name, delete_endpoint), + self.delete_if_exists('service', safe_name, delete_service), + self.delete_if_exists('ingress', safe_name, delete_ingress), + ) + @_await_async_init async def get_all_routes(self): # copy everything, because iterating over this directly is not threadsafe # FIXME: is this performance intensive? It could be! Measure? diff --git a/kubespawner/reflector.py b/kubespawner/reflector.py index ac0c61d3..af57c6e5 100644 --- a/kubespawner/reflector.py +++ b/kubespawner/reflector.py @@ -1,13 +1,14 @@ # specifically use concurrent.futures for threadsafety # asyncio Futures cannot be used across threads +import asyncio import json import threading import time from concurrent.futures import Future from functools import partial -from kubernetes import config -from kubernetes import watch +from kubernetes_asyncio import config +from kubernetes_asyncio import watch from traitlets import Any from traitlets import Bool from traitlets import Dict @@ -27,6 +28,15 @@ class ResourceReflector(LoggingConfigurable): kubernetes resources. Must be subclassed once per kind of resource that needs watching. + + Creating a reflector should be done with the create() classmethod, + since that, in addition to creating the instance starts the watch task. + + Shutting down a reflector should be done by awaiting its stop() method. + + KubeSpawner does not do this, because its reflectors are singleton + instances shared among multiple spawners. The watch task therefore runs + until JupyterHub exits. """ labels = Dict( @@ -158,7 +168,10 @@ class ResourceReflector(LoggingConfigurable): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # client configuration for kubernetes has already taken place + + # Client configuration for kubernetes, as done via the load_config + # function, has already taken place in KubeSpawner or KubeIngressProxy + # initialization steps. self.api = shared_client(self.api_group_name) # FIXME: Protect against malicious labels? @@ -203,12 +216,9 @@ def __init__(self, *args, **kwargs): if not self.list_method_name: raise RuntimeError("Reflector list_method_name must be set!") - self.start() - - def __del__(self): - self.stop() + self.watch_task = None - def _list_and_update(self): + async def _list_and_update(self): """ Update current list of resources by doing a full fetch. @@ -224,9 +234,10 @@ def _list_and_update(self): if not self.omit_namespace: kwargs["namespace"] = self.namespace - initial_resources = getattr(self.api, self.list_method_name)(**kwargs) + list_method = getattr(self.api, self.list_method_name) + initial_resources_raw = await list_method(**kwargs) # This is an atomic operation on the dictionary! - initial_resources = json.loads(initial_resources.read()) + initial_resources = json.loads(await initial_resources_raw.read()) self.resources = { f'{p["metadata"]["namespace"]}/{p["metadata"]["name"]}': p for p in initial_resources["items"] @@ -234,12 +245,10 @@ def _list_and_update(self): # return the resource version so we can hook up a watch return initial_resources["metadata"]["resourceVersion"] - def _watch_and_update(self): + async def _watch_and_update(self): """ Keeps the current list of resources up-to-date - This method is to be run not on the main thread! - We first fetch the list of current resources, and store that. Then we register to be notified of changes to those resources, and keep our local store up-to-date based on these notifications. @@ -251,11 +260,9 @@ def _watch_and_update(self): changes that might've been missed in the time we were not doing a watch. - Note that we're playing a bit with fire here, by updating a dictionary - in this thread while it is probably being read in another thread - without using locks! However, dictionary access itself is atomic, - and as long as we don't try to mutate them (do a 'fetch / modify / - update' cycle on them), we should be ok! + Since the resources are read-only in the Spawner (where they are + used), then this is safe. The Spawner's view of the world might be + out-of-date, but it's not going to corrupt any data. """ selectors = [] log_name = "" @@ -283,7 +290,7 @@ def _watch_and_update(self): start = time.monotonic() w = watch.Watch() try: - resource_version = self._list_and_update() + resource_version = await self._list_and_update() if not self.first_load_future.done(): # signal that we've loaded our initial data self.first_load_future.set_result(None) @@ -300,41 +307,47 @@ def _watch_and_update(self): if self.timeout_seconds: # set watch timeout watch_args['timeout_seconds'] = self.timeout_seconds + # Calling the method with _preload_content=False is a performance + # optimization making the Kubernetes client do less work. See + # https://github.com/jupyterhub/kubespawner/pull/424. method = partial( getattr(self.api, self.list_method_name), _preload_content=False ) - # in case of timeout_seconds, the w.stream just exits (no exception thrown) - # -> we stop the watcher and start a new one - for watch_event in w.stream(method, **watch_args): - # Remember that these events are k8s api related WatchEvents - # objects, not k8s Event or Pod representations, they will - # reside in the WatchEvent's object field depending on what - # kind of resource is watched. - # - # ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#watchevent-v1-meta - # ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#event-v1-core - cur_delay = 0.1 - resource = watch_event['object'] - ref_key = "{}/{}".format( - resource["metadata"]["namespace"], resource["metadata"]["name"] - ) - if watch_event['type'] == 'DELETED': - # This is an atomic delete operation on the dictionary! - self.resources.pop(ref_key, None) - else: - # This is an atomic operation on the dictionary! - self.resources[ref_key] = resource - if self._stop_event.is_set(): - self.log.info("%s watcher stopped", self.kind) - break - watch_duration = time.monotonic() - start - if watch_duration >= self.restart_seconds: - self.log.debug( - "Restarting %s watcher after %i seconds", - self.kind, - watch_duration, + async with w.stream(method, **watch_args) as stream: + async for watch_event in stream: + # in case of timeout_seconds, the w.stream just exits (no exception thrown) + # -> we stop the watcher and start a new one + # Remember that these events are k8s api related WatchEvents + # objects, not k8s Event or Pod representations, they will + # reside in the WatchEvent's object field depending on what + # kind of resource is watched. + # + # ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#watchevent-v1-meta + # ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#event-v1-core + cur_delay = 0.1 + resource = watch_event['raw_object'] + ref_key = "{}/{}".format( + resource["metadata"]["namespace"], + resource["metadata"]["name"], ) - break + if watch_event['type'] == 'DELETED': + # This is an atomic delete operation on the dictionary! + self.resources.pop(ref_key, None) + else: + # This is an atomic operation on the dictionary! + self.resources[ref_key] = resource + if self.stopped(): + self.log.info("%s watcher stopped: inner", self.kind) + break + watch_duration = time.monotonic() - start + if watch_duration >= self.restart_seconds: + self.log.debug( + "Restarting %s watcher after %i seconds", + self.kind, + watch_duration, + ) + break + except ReadTimeoutError: # network read time out, just continue and restart the watch # this could be due to a network problem or just low activity @@ -350,19 +363,19 @@ def _watch_and_update(self): self.log.exception( "Error when watching resources, retrying in %ss", cur_delay ) - time.sleep(cur_delay) + await asyncio.sleep(cur_delay) continue else: # no events on watch, reconnect self.log.debug("%s watcher timeout", self.kind) finally: w.stop() - if self._stop_event.is_set(): - self.log.info("%s watcher stopped", self.kind) + if self.stopped(): + self.log.info("%s watcher stopped: outer", self.kind) break self.log.warning("%s watcher finished", self.kind) - def start(self): + async def start(self): """ Start the reflection process! @@ -372,17 +385,29 @@ def start(self): start of program initialization (when the singleton is being created), and not afterwards! """ - if hasattr(self, 'watch_thread'): - raise ValueError('Thread watching for resources is already running') + if self.watch_task and not self.watch_task.done(): + raise RuntimeError('Task watching for resources is already running') - self._list_and_update() - self.watch_thread = threading.Thread(target=self._watch_and_update) - # If the watch_thread is only thread left alive, exit app - self.watch_thread.daemon = True - self.watch_thread.start() + await self._list_and_update() + self.watch_task = asyncio.create_task(self._watch_and_update()) - def stop(self): + async def stop(self): + """ + Cleanly shut down the watch task. + """ self._stop_event.set() + # The watch task should now be in the process of terminating. Give + # it a bit... + if self.watch_task and not self.watch_task.done(): + try: + timeout = 5 + await asyncio.wait_for(self.watch_task, timeout) + except asyncio.TimeoutError: + # Raising the TimeoutError will cancel the task. + self.log.warning( + f"Watch task did not finish in {timeout}s and was cancelled" + ) + self.watch_task = None def stopped(self): return self._stop_event.is_set() diff --git a/kubespawner/spawner.py b/kubespawner/spawner.py index 6c9e9d26..27f53082 100644 --- a/kubespawner/spawner.py +++ b/kubespawner/spawner.py @@ -5,30 +5,25 @@ implementation that should be used by JupyterHub. """ import asyncio -import multiprocessing import os import signal import string import sys import warnings -from concurrent.futures import ThreadPoolExecutor -from datetime import timedelta from functools import partial +from functools import wraps from urllib.parse import urlparse import escapism -import kubernetes.config from jinja2 import BaseLoader from jinja2 import Environment from jupyterhub.spawner import Spawner from jupyterhub.traitlets import Command from jupyterhub.utils import exponential_backoff -from kubernetes import client -from kubernetes.client.rest import ApiException +from kubernetes_asyncio import client +from kubernetes_asyncio.client.rest import ApiException from slugify import slugify from tornado import gen -from tornado.concurrent import run_on_executor -from tornado.ioloop import IOLoop from traitlets import Bool from traitlets import default from traitlets import Dict @@ -39,6 +34,7 @@ from traitlets import Union from traitlets import validate +from .clients import load_config from .clients import shared_client from .objects import make_namespace from .objects import make_owner_reference @@ -123,12 +119,6 @@ class KubeSpawner(Spawner): spawned by a user will have its own KubeSpawner instance. """ - # We want to have one single threadpool executor that is shared across all - # KubeSpawner instances, so we apply a Singleton pattern. We initialize this - # class variable from the first KubeSpawner instance that is created and - # then reference it from all instances. The same goes for the PodReflector - # and EventReflector. - executor = None reflectors = { "pods": None, "events": None, @@ -158,6 +148,10 @@ def __init__(self, *args, **kwargs): _mock = kwargs.pop('_mock', False) super().__init__(*args, **kwargs) + # Schedules async initialization logic that is to be awaited by async + # functions by decorating them with @_await_async_init. + self._async_init_future = asyncio.ensure_future(self._async_init()) + if _mock: # runs during test execution only if 'user' not in kwargs: @@ -176,10 +170,10 @@ def __init__(self, *args, **kwargs): self.hub = hub # We have to set the namespace (if user namespaces are enabled) - # before we start the reflectors, so this must run before - # watcher start in normal execution. We still want to get the - # namespace right for test, though, so we need self.user to have - # been set in order to do that. + # before we start the reflectors, so this must run before + # watcher start in normal execution. We still want to get the + # namespace right for test, though, so we need self.user to have + # been set in order to do that. # By now, all the traitlets have been set, so we can use them to # compute other attributes @@ -188,30 +182,6 @@ def __init__(self, *args, **kwargs): self.namespace = self._expand_user_properties(self.user_namespace_template) self.log.info("Using user namespace: {}".format(self.namespace)) - if not _mock: - # runs during normal execution only - - if self.__class__.executor is None: - self.log.debug( - 'Starting executor thread pool with %d workers', - self.k8s_api_threadpool_workers, - ) - self.__class__.executor = ThreadPoolExecutor( - max_workers=self.k8s_api_threadpool_workers - ) - - # Set global kubernetes client configurations - # before reflector.py code runs - self._set_k8s_client_configuration() - self.api = shared_client('CoreV1Api') - - # This will start watching in __init__, so it'll start the first - # time any spawner object is created. Not ideal but works! - self._start_watching_pods() - if self.events_enabled: - self._start_watching_events() - - # runs during both test and normal execution self.pod_name = self._expand_user_properties(self.pod_name_template) self.dns_name = self.dns_name_template.format( namespace=self.namespace, name=self.pod_name @@ -227,25 +197,55 @@ def __init__(self, *args, **kwargs): # The attribute needs to exist, even though it is unset to start with self._start_future = None - def _set_k8s_client_configuration(self): - # The actual (singleton) Kubernetes client will be created - # in clients.py shared_client but the configuration - # for token / ca_cert / k8s api host is set globally - # in kubernetes.py syntax. It is being set here - # and this method called prior to shared_client - # for readability / coupling with traitlets values - try: - kubernetes.config.load_incluster_config() - except kubernetes.config.ConfigException: - kubernetes.config.load_kube_config() - if self.k8s_api_ssl_ca_cert: - global_conf = client.Configuration.get_default_copy() - global_conf.ssl_ca_cert = self.k8s_api_ssl_ca_cert - client.Configuration.set_default(global_conf) - if self.k8s_api_host: - global_conf = client.Configuration.get_default_copy() - global_conf.host = self.k8s_api_host - client.Configuration.set_default(global_conf) + async def _async_init(self): + """ + This method is scheduled to run from `__init__`, but not awaited there + as it can't be marked as async. + + Since JupyterHub won't await this method, we ensure the async methods + JupyterHub may call on this object will await this method before + continuing. To do this, we decorate them with `_await_async_init`. + + But, how do we figure out the methods to decorate? Likely only those + exposed by the base class that JupyterHub would know about. The base + class is Spawner, as declared in spawner.py: + https://github.com/jupyterhub/jupyterhub/blob/HEAD/jupyterhub/spawner.py. + + From the Proxy class docstring we can conclude that the following + methods, if implemented, could be what we need to decorate with + _await_async_init: + + - load_state (implemented) + - get_state (implemented) + - start (implemented and decorated) + - stop (implemented and decorated) + - poll (implemented and decorated) + + Out of these, it seems that only `start`, `stop`, and `poll` would need + the initialization logic in this method to have completed. + + This is slightly complicated by the fact that `start` is already a + synchronous method that returns a future, so where we want the + decorator is actually on the async `_start` that `start` calls. + """ + await load_config(caller=self) + self.api = shared_client("CoreV1Api") + await self._start_watching_pods() + if self.events_enabled: + await self._start_watching_events() + + def _await_async_init(method): + """A decorator to await the _async_init method after having been + scheduled to run in the `__init__` method.""" + + @wraps(method) + async def async_method(self, *args, **kwargs): + if self._async_init_future is not None: + await self._async_init_future + self._async_init_future = None + return await method(self, *args, **kwargs) + + return async_method k8s_api_ssl_ca_cert = Unicode( "", @@ -274,16 +274,11 @@ def _set_k8s_client_configuration(self): ) k8s_api_threadpool_workers = Integer( - # Set this explicitly, since this is the default in Python 3.5+ - # but not in 3.4 - 5 * multiprocessing.cpu_count(), config=True, help=""" - Number of threads in thread pool used to talk to the k8s API. - - Increase this if you are dealing with a very large number of users. + DEPRECATED in KubeSpawner 3.0.0. - Defaults to `5 * cpu_cores`, which is the default for `ThreadPoolExecutor`. + No longer has any effect, as there is no threadpool anymore. """, ) @@ -2055,6 +2050,7 @@ def load_state(self, state): if 'pod_name' in state: self.pod_name = state['pod_name'] + @_await_async_init async def poll(self): """ Check if the pod is still running. @@ -2125,10 +2121,6 @@ def _normalize_url(url): # pod doesn't exist or has been deleted return 1 - @run_on_executor - def asynchronize(self, method, *args, **kwargs): - return method(*args, **kwargs) - @property def events(self): """Filter event-reflector to just this pods events @@ -2216,7 +2208,7 @@ async def progress(self): break await asyncio.sleep(1) - def _start_reflector( + async def _start_reflector( self, kind=None, reflector_class=ResourceReflector, @@ -2236,7 +2228,6 @@ def _start_reflector( If replace=True, a running pod reflector will be stopped and a new one started (for recovering from possible errors). """ - main_loop = IOLoop.current() key = kind ReflectorClass = reflector_class @@ -2259,15 +2250,16 @@ def on_reflector_failure(): on_failure=on_reflector_failure, **kwargs, ) + await self.__class__.reflectors[key].start() if replace and previous_reflector: # we replaced the reflector, stop the old one - previous_reflector.stop() + await previous_reflector.stop() # return the current reflector return self.__class__.reflectors[key] - def _start_watching_events(self, replace=False): + async def _start_watching_events(self, replace=False): """Start the events reflector If replace=False and the event reflector is already running, @@ -2276,7 +2268,7 @@ def _start_watching_events(self, replace=False): If replace=True, a running pod reflector will be stopped and a new one started (for recovering from possible errors). """ - return self._start_reflector( + return await self._start_reflector( kind="events", reflector_class=EventReflector, fields={"involvedObject.kind": "Pod"}, @@ -2284,7 +2276,7 @@ def _start_watching_events(self, replace=False): replace=replace, ) - def _start_watching_pods(self, replace=False): + async def _start_watching_pods(self, replace=False): """Start the pod reflector If replace=False and the pod reflector is already running, @@ -2295,15 +2287,13 @@ def _start_watching_pods(self, replace=False): """ pod_reflector_class = PodReflector pod_reflector_class.labels.update({"component": self.component_label}) - return self._start_reflector( + return await self._start_reflector( "pods", PodReflector, omit_namespace=self.enable_user_namespaces, replace=replace, ) - # record a future for the call to .start() - # so we can use it to terminate .progress() def start(self): """Thin wrapper around self._start @@ -2327,17 +2317,15 @@ async def _make_create_pod_request(self, pod, request_timeout): self.log.info( f"Attempting to create pod {pod.metadata.name}, with timeout {request_timeout}" ) - # Use tornado's timeout, _request_timeout seems unreliable? - await gen.with_timeout( - timedelta(seconds=request_timeout), - self.asynchronize( - self.api.create_namespaced_pod, + await asyncio.wait_for( + self.api.create_namespaced_pod( self.namespace, pod, ), + request_timeout, ) return True - except gen.TimeoutError: + except asyncio.TimeoutError: # Just try again return False except ApiException as e: @@ -2369,16 +2357,15 @@ async def _make_create_pvc_request(self, pvc, request_timeout): self.log.info( f"Attempting to create pvc {pvc.metadata.name}, with timeout {request_timeout}" ) - await gen.with_timeout( - timedelta(seconds=request_timeout), - self.asynchronize( - self.api.create_namespaced_persistent_volume_claim, + await asyncio.wait_for( + self.api.create_namespaced_persistent_volume_claim( namespace=self.namespace, body=pvc, ), + request_timeout, ) return True - except gen.TimeoutError: + except asyncio.TimeoutError: # Just try again return False except ApiException as e: @@ -2391,8 +2378,7 @@ async def _make_create_pvc_request(self, pvc, request_timeout): t, v, tb = sys.exc_info() try: - await self.asynchronize( - self.api.read_namespaced_persistent_volume_claim, + await self.api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace=self.namespace, ) @@ -2423,11 +2409,11 @@ async def _ensure_not_exists(self, kind, name): # first, attempt to delete the resource try: self.log.info(f"Deleting {kind}/{name}") - await gen.with_timeout( - timedelta(seconds=self.k8s_api_request_timeout), - self.asynchronize(delete, namespace=self.namespace, name=name), + await asyncio.wait_for( + delete(namespace=self.namespace, name=name), + self.k8s_api_request_timeout, ) - except gen.TimeoutError: + except asyncio.TimeoutError: # Just try again return False except ApiException as e: @@ -2440,11 +2426,10 @@ async def _ensure_not_exists(self, kind, name): try: self.log.info(f"Checking for {kind}/{name}") - await gen.with_timeout( - timedelta(seconds=self.k8s_api_request_timeout), - self.asynchronize(read, namespace=self.namespace, name=name), + await asyncio.wait_for( + read(namespace=self.namespace, name=name), self.k8s_api_request_timeout ) - except gen.TimeoutError: + except asyncio.TimeoutError: # Just try again return False except ApiException as e: @@ -2465,16 +2450,10 @@ async def _make_create_resource_request(self, kind, manifest): create = getattr(self.api, f"create_namespaced_{kind}") self.log.info(f"Attempting to create {kind} {manifest.metadata.name}") try: - # Use tornado's timeout, _request_timeout seems unreliable? - await gen.with_timeout( - timedelta(seconds=self.k8s_api_request_timeout), - self.asynchronize( - create, - self.namespace, - manifest, - ), + await asyncio.wait_for( + create(self.namespace, manifest), self.k8s_api_request_timeout ) - except gen.TimeoutError: + except asyncio.TimeoutError: # Just try again return False except ApiException as e: @@ -2488,6 +2467,7 @@ async def _make_create_resource_request(self, kind, manifest): else: return True + @_await_async_init async def _start(self): """Start the user's pod""" @@ -2644,18 +2624,17 @@ async def _make_delete_pod_request( ref_key = "{}/{}".format(self.namespace, pod_name) self.log.info("Deleting pod %s", ref_key) try: - await gen.with_timeout( - timedelta(seconds=request_timeout), - self.asynchronize( - self.api.delete_namespaced_pod, + await asyncio.wait_for( + self.api.delete_namespaced_pod( name=pod_name, namespace=self.namespace, body=delete_options, grace_period_seconds=grace_seconds, ), + request_timeout, ) return True - except gen.TimeoutError: + except asyncio.TimeoutError: return False except ApiException as e: if e.status == 404: @@ -2677,16 +2656,15 @@ async def _make_delete_pvc_request(self, pvc_name, request_timeout): """ self.log.info("Deleting pvc %s", pvc_name) try: - await gen.with_timeout( - timedelta(seconds=request_timeout), - self.asynchronize( - self.api.delete_namespaced_persistent_volume_claim, + await asyncio.wait_for( + self.api.delete_namespaced_persistent_volume_claim( name=pvc_name, namespace=self.namespace, ), + request_timeout, ) return True - except gen.TimeoutError: + except asyncio.TimeoutError: return False except ApiException as e: if e.status == 404: @@ -2699,6 +2677,7 @@ async def _make_delete_pvc_request(self, pvc_name, request_timeout): else: raise + @_await_async_init async def stop(self, now=False): delete_options = client.V1DeleteOptions() @@ -2892,9 +2871,9 @@ async def _ensure_namespace(self): ns = make_namespace(self.namespace) api = self.api try: - await gen.with_timeout( - timedelta(seconds=self.k8s_api_request_timeout), - self.asynchronize(api.create_namespace, ns), + await asyncio.wait_for( + api.create_namespace(ns), + self.k8s_api_request_timeout, ) except ApiException as e: if e.status != 409: diff --git a/pyproject.toml b/pyproject.toml index a7630d48..99e00de3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,6 @@ [tool.black] skip-string-normalization = true target_version = [ - "py36", "py37", "py38", ] diff --git a/setup.cfg b/setup.cfg index 7967ad45..13dcd2d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,4 @@ test=pytest [tool:pytest] # Ignore thousands of tests in dependencies installed in a virtual environment norecursedirs = lib lib64 +asyncio_mode = auto diff --git a/setup.py b/setup.py index fd77b159..5f4558d6 100644 --- a/setup.py +++ b/setup.py @@ -6,8 +6,8 @@ from setuptools import setup v = sys.version_info -if v[:2] < (3, 6): - error = "ERROR: jupyterhub-kubespawner requires Python version 3.6 or above." +if v[:2] < (3, 7): + error = "ERROR: kubespawner requires Python version 3.7 or above." print(error, file=sys.stderr) sys.exit(1) @@ -20,15 +20,16 @@ 'python-slugify', 'jupyterhub>=0.8', 'jinja2', - 'kubernetes>=10.1.0', + 'kubernetes_asyncio>=19.15.1', 'urllib3', 'pyYAML', ], - python_requires='>=3.6', + python_requires='>=3.7', extras_require={ 'test': [ 'bump2version', 'flake8', + 'kubernetes>=11', 'pytest>=5.4', 'pytest-cov', 'pytest-asyncio>=0.11.0', diff --git a/tests/conftest.py b/tests/conftest.py index d70b8d09..d2355464 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ """pytest fixtures for kubespawner""" +import asyncio import base64 import inspect import io @@ -7,26 +8,31 @@ import sys import tarfile import time -from distutils.version import LooseVersion as V from functools import partial +from threading import Event from threading import Thread -import kubernetes +import kubernetes_asyncio import pytest +import pytest_asyncio from jupyterhub.app import JupyterHub from jupyterhub.objects import Hub -from kubernetes.client import V1ConfigMap -from kubernetes.client import V1Namespace -from kubernetes.client import V1Pod -from kubernetes.client import V1PodSpec -from kubernetes.client import V1Secret -from kubernetes.client import V1Service -from kubernetes.client import V1ServicePort -from kubernetes.client import V1ServiceSpec -from kubernetes.client.rest import ApiException -from kubernetes.config import load_kube_config -from kubernetes.stream import stream -from kubernetes.watch import Watch +from kubernetes import __version__ as sync_version +from kubernetes.client import CoreV1Api as sync_CoreV1Api +from kubernetes.config import load_kube_config as sync_load_kube_config +from kubernetes.stream import stream as sync_stream +from kubernetes_asyncio.client import V1ConfigMap +from kubernetes_asyncio.client import V1Namespace +from kubernetes_asyncio.client import V1Pod +from kubernetes_asyncio.client import V1PodSpec +from kubernetes_asyncio.client import V1Secret +from kubernetes_asyncio.client import V1Service +from kubernetes_asyncio.client import V1ServicePort +from kubernetes_asyncio.client import V1ServiceSpec +from kubernetes_asyncio.client.rest import ApiException +from kubernetes_asyncio.config import load_kube_config +from kubernetes_asyncio.watch import Watch +from packaging.version import Version as V from traitlets.config import Config from kubespawner.clients import shared_client @@ -34,6 +40,18 @@ here = os.path.abspath(os.path.dirname(__file__)) jupyterhub_config_py = os.path.join(here, "jupyterhub_config.py") +# We do these to set up the synchronous client, needed for executing +# python inside pods. +sync_load_kube_config() +sync_corev1api = sync_CoreV1Api() + + +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + @pytest.fixture(autouse=True) def traitlets_logging(): @@ -101,20 +119,18 @@ def ssl_app(tmpdir_factory, kube_ns): return app -def watch_logs(kube_client, pod_info): +async def watch_logs(kube_client, pod_info): """Stream a single pod's logs - pod logs are streamed directly to sys.stderr, + pod logs are streamed directly to sys.stdout, so that pytest capture can deal with it. - Blocking, should be run in a thread. - Called for each new pod from watch_kubernetes """ watch = Watch() while True: try: - for event in watch.stream( + async for event in watch.stream( func=kube_client.read_namespaced_pod_log, namespace=pod_info.namespace, name=pod_info.name, @@ -124,53 +140,73 @@ def watch_logs(kube_client, pod_info): if e.status == 400: # 400 can occur if the container is not yet ready # wait and retry - time.sleep(1) + await asyncio.sleep(1) continue elif e.status == 404: # pod is gone, we are done return else: - # unexpeced error + # unexpected error print(f"Error watching logs for {pod_info.name}: {e}", file=sys.stderr) raise else: break -def watch_kubernetes(kube_client, kube_ns): +async def watch_kubernetes(kube_client, kube_ns): """Stream kubernetes events to stdout so that pytest io capturing can include k8s events and logs All events are streamed to stdout - When a new pod is started, spawn an additional thread to watch its logs + When a new pod is started, spawn an additional task to watch its logs """ - log_threads = {} - watch = Watch() - for event in watch.stream( - func=kube_client.list_namespaced_event, - namespace=kube_ns, - ): - resource = event['object'] - obj = resource.involved_object - print(f"k8s event ({event['type']} {obj.kind}/{obj.name}): {resource.message}") + watch = Watch() + watch_task = {} - # new pod appeared, start streaming its logs - if ( - obj.kind == "Pod" - and event["type"] == "ADDED" - and obj.name not in log_threads + try: + async for event in watch.stream( + func=kube_client.list_namespaced_event, + namespace=kube_ns, ): - log_threads[obj.name] = t = Thread( - target=watch_logs, args=(kube_client, obj), daemon=True - ) - t.start() + resource = event['object'] + obj = resource.involved_object + print( + f"k8s event ({event['type']} {obj.kind}/{obj.name}): {resource.message}" + ) -@pytest.fixture(scope="session") -def kube_client(request, kube_ns): + # new pod appeared, start streaming its logs + if ( + obj.kind == "Pod" + and event["type"] == "ADDED" + and obj.name not in watch_task + ): + watch_task[obj.name] = asyncio.create_task( + watch_logs( + kube_client, + obj, + ), + ) + + except asyncio.CancelledError as exc: + # kube_client cleanup cancelled us. In turn, we should cancel + # the individual watch tasks. + for t in watch_task: + if watch_task[t] and not watch_task[t].done(): + try: + watch_task[t].cancel() + except asyncio.CancelledError: + # Swallow these; they are what we expect. + pass + # And re-raise so kube_client can finish cleanup + raise exc + + +@pytest_asyncio.fixture(scope="session") +async def kube_client(request, kube_ns): """fixture for the Kubernetes client object. skips test that require kubernetes if kubernetes cannot be contacted @@ -179,29 +215,38 @@ def kube_client(request, kube_ns): - Hooks up kubernetes events and logs to pytest capture - Cleans up kubernetes namespace on exit """ - load_kube_config() + await load_kube_config() client = shared_client("CoreV1Api") + stop_signal = asyncio.Queue() try: - namespaces = client.list_namespace(_request_timeout=3) + namespaces = await client.list_namespace(_request_timeout=3) except Exception as e: pytest.skip("Kubernetes not found: %s" % e) if not any(ns.metadata.name == kube_ns for ns in namespaces.items): print("Creating namespace %s" % kube_ns) - client.create_namespace(V1Namespace(metadata=dict(name=kube_ns))) + await client.create_namespace(V1Namespace(metadata=dict(name=kube_ns))) else: print("Using existing namespace %s" % kube_ns) # begin streaming all logs and events in our test namespace - t = Thread(target=watch_kubernetes, args=(client, kube_ns), daemon=True) - t.start() + t = asyncio.create_task(watch_kubernetes(client, kube_ns)) + + yield client - # delete the test namespace when we finish - def cleanup_namespace(): - client.delete_namespace(kube_ns, body={}, grace_period_seconds=0) - for i in range(3): + # Clean up at close by sending a cancel to watch_kubernetes and letting + # it handle the signal, cancel the tasks *it* started, and then raising + # it back to us. + try: + t.cancel() + except asyncio.CancelledError: + pass + # allow opting out of namespace cleanup, for post-mortem debugging + if not os.environ.get("KUBESPAWNER_DEBUG_NAMESPACE"): + await client.delete_namespace(kube_ns, body={}, grace_period_seconds=0) + for i in range(20): # Usually finishes a good deal faster try: - ns = client.read_namespace(kube_ns) + ns = await client.read_namespace(kube_ns) except ApiException as e: if e.status == 404: return @@ -209,19 +254,14 @@ def cleanup_namespace(): raise else: print("waiting for %s to delete" % kube_ns) - time.sleep(1) - - # allow opting out of namespace cleanup, for post-mortem debugging - if not os.environ.get("KUBESPAWNER_DEBUG_NAMESPACE"): - request.addfinalizer(cleanup_namespace) - return client + await asyncio.sleep(1) -def wait_for_pod(kube_client, kube_ns, pod_name, timeout=90): +async def wait_for_pod(kube_client, kube_ns, pod_name, timeout=90): """Wait for a pod to be ready""" conditions = {} for i in range(int(timeout)): - pod = kube_client.read_namespaced_pod(namespace=kube_ns, name=pod_name) + pod = await kube_client.read_namespaced_pod(namespace=kube_ns, name=pod_name) for condition in pod.status.conditions or []: conditions[condition.type] = condition.status @@ -229,7 +269,7 @@ def wait_for_pod(kube_client, kube_ns, pod_name, timeout=90): print( f"Waiting for pod {kube_ns}/{pod_name}; current status: {pod.status.phase}; {conditions}" ) - time.sleep(1) + await asyncio.sleep(1) else: break @@ -238,7 +278,7 @@ def wait_for_pod(kube_client, kube_ns, pod_name, timeout=90): return pod -def ensure_not_exists(kube_client, kube_ns, name, resource_type, timeout=30): +async def ensure_not_exists(kube_client, kube_ns, name, resource_type, timeout=30): """Ensure an object doesn't exist Request deletion and wait for it to be gone @@ -246,7 +286,7 @@ def ensure_not_exists(kube_client, kube_ns, name, resource_type, timeout=30): delete = getattr(kube_client, "delete_namespaced_{}".format(resource_type)) read = getattr(kube_client, "read_namespaced_{}".format(resource_type)) try: - delete(namespace=kube_ns, name=name) + await delete(namespace=kube_ns, name=name) except ApiException as e: if e.status != 404: raise @@ -254,7 +294,7 @@ def ensure_not_exists(kube_client, kube_ns, name, resource_type, timeout=30): while True: # wait for delete try: - read(namespace=kube_ns, name=name) + await read(namespace=kube_ns, name=name) except ApiException as e: if e.status == 404: # deleted @@ -263,10 +303,12 @@ def ensure_not_exists(kube_client, kube_ns, name, resource_type, timeout=30): raise else: print("waiting for {}/{} to delete".format(resource_type, name)) - time.sleep(1) + await asyncio.sleep(1) -def create_resource(kube_client, kube_ns, resource_type, manifest, delete_first=True): +async def create_resource( + kube_client, kube_ns, resource_type, manifest, delete_first=True +): """Create a kubernetes resource handling 409 errors and others that can occur due to rapid startup @@ -274,13 +316,13 @@ def create_resource(kube_client, kube_ns, resource_type, manifest, delete_first= """ name = manifest.metadata["name"] if delete_first: - ensure_not_exists(kube_client, kube_ns, name, resource_type) + await ensure_not_exists(kube_client, kube_ns, name, resource_type) print(f"Creating {resource_type} {name}") create = getattr(kube_client, f"create_namespaced_{resource_type}") error = None for i in range(10): try: - create( + await create( body=manifest, namespace=kube_ns, ) @@ -290,14 +332,14 @@ def create_resource(kube_client, kube_ns, resource_type, manifest, delete_first= error = e # need to retry since this can fail if run too soon after namespace creation print(e, file=sys.stderr) - time.sleep(int(e.headers.get("Retry-After", 1))) + await asyncio.sleep(int(e.headers.get("Retry-After", 1))) else: break else: raise error -def create_hub_pod(kube_client, kube_ns, pod_name="hub", ssl=False): +async def create_hub_pod(kube_client, kube_ns, pod_name="hub", ssl=False): config_map_name = pod_name + "-config" secret_name = pod_name + "-secret" with open(jupyterhub_config_py) as f: @@ -307,7 +349,7 @@ def create_hub_pod(kube_client, kube_ns, pod_name="hub", ssl=False): metadata={"name": config_map_name}, data={"jupyterhub_config.py": config} ) - config_map = create_resource( + config_map = await create_resource( kube_client, kube_ns, "config_map", @@ -360,14 +402,14 @@ def create_hub_pod(kube_client, kube_ns, pod_name="hub", ssl=False): ], ), ) - pod = create_resource(kube_client, kube_ns, "pod", pod_manifest) - return wait_for_pod(kube_client, kube_ns, pod_name) + pod = await create_resource(kube_client, kube_ns, "pod", pod_manifest) + return await wait_for_pod(kube_client, kube_ns, pod_name) -@pytest.fixture(scope="session") -def hub_pod(kube_client, kube_ns): +@pytest_asyncio.fixture(scope="session") +async def hub_pod(kube_client, kube_ns): """Create and return a pod running jupyterhub""" - return create_hub_pod(kube_client, kube_ns) + return await create_hub_pod(kube_client, kube_ns) @pytest.fixture @@ -379,8 +421,8 @@ def hub(hub_pod): return Hub(ip=hub_pod.status.pod_ip, port=8081) -@pytest.fixture(scope="session") -def hub_pod_ssl(kube_client, kube_ns, ssl_app): +@pytest_asyncio.fixture(scope="session") +async def hub_pod_ssl(kube_client, kube_ns, ssl_app): """Start a hub pod with internal_ssl enabled""" # load ssl dir to tarfile buf = io.BytesIO() @@ -393,7 +435,7 @@ def hub_pod_ssl(kube_client, kube_ns, ssl_app): secret_manifest = V1Secret( metadata={"name": secret_name}, data={"internal-ssl.tar": b64_certs} ) - create_resource(kube_client, kube_ns, "secret", secret_manifest) + await create_resource(kube_client, kube_ns, "secret", secret_manifest) name = "hub-ssl" @@ -406,9 +448,9 @@ def hub_pod_ssl(kube_client, kube_ns, ssl_app): ), ) - create_resource(kube_client, kube_ns, "service", service_manifest) + await create_resource(kube_client, kube_ns, "service", service_manifest) - return create_hub_pod( + return await create_hub_pod( kube_client, kube_ns, pod_name=name, @@ -443,7 +485,9 @@ def __str__(self): ) -def _exec_python_in_pod(kube_client, kube_ns, pod_name, code, kwargs=None, _retries=0): +async def _exec_python_in_pod( + kube_client, kube_ns, pod_name, code, kwargs=None, _retries=0 +): """Run simple Python code in a pod code can be a str of code, or a 'simple' Python function, @@ -451,11 +495,7 @@ def _exec_python_in_pod(kube_client, kube_ns, pod_name, code, kwargs=None, _retr kwargs are passed to the function, if it is given. """ - if V(kubernetes.__version__) < V("11"): - pytest.skip( - f"exec tests require kubernetes >= 11, got {kubernetes.__version__}" - ) - pod = wait_for_pod(kube_client, kube_ns, pod_name) + pod = await wait_for_pod(kube_client, kube_ns, pod_name) original_code = code if not isinstance(code, str): # allow simple self-contained (no globals or args) functions @@ -479,8 +519,14 @@ def _exec_python_in_pod(kube_client, kube_ns, pod_name, code, kwargs=None, _retr print("Running {} in {}".format(code, pod_name)) # need to create ws client to get returncode, # see https://github.com/kubernetes-client/python/issues/812 - client = stream( - kube_client.connect_get_namespaced_pod_exec, + # + # That's why we are using the synchronous Kubernetes client here + # and why we imported them in the first place: kubernetes_asyncio + # does not yet support multichannel ws clients, which are needed + # to get the return code. + # cf https://github.com/tomplus/kubernetes_asyncio/issues/12 + client = sync_stream( + sync_corev1api.connect_get_namespaced_pod_exec, pod_name, namespace=kube_ns, command=exec_command, @@ -503,8 +549,8 @@ def _exec_python_in_pod(kube_client, kube_ns, pod_name, code, kwargs=None, _retr raise ExecError(exit_code=returncode, message=stderr, command=code) else: # retry - time.sleep(1) - return _exec_python_in_pod( + await asyncio.sleep(1) + return await _exec_python_in_pod( kube_client, kube_ns, pod_name, @@ -524,8 +570,8 @@ def exec_python_pod(kube_client, kube_ns): return partial(_exec_python_in_pod, kube_client, kube_ns) -@pytest.fixture(scope="session") -def exec_python(kube_ns, kube_client): +@pytest_asyncio.fixture(scope="session") +async def exec_python(kube_client, kube_ns): """Return a callable to execute Python code in a pod in the test namespace This fixture creates a dedicated pod for executing commands @@ -550,6 +596,6 @@ def exec_python(kube_ns, kube_client): termination_grace_period_seconds=0, ), ) - pod = create_resource(kube_client, kube_ns, "pod", pod_manifest) + pod = await create_resource(kube_client, kube_ns, "pod", pod_manifest) yield partial(_exec_python_in_pod, kube_client, kube_ns, pod_name) diff --git a/tests/test_multi_namespace_spawner.py b/tests/test_multi_namespace_spawner.py index f9742455..b64321ef 100644 --- a/tests/test_multi_namespace_spawner.py +++ b/tests/test_multi_namespace_spawner.py @@ -7,12 +7,12 @@ from jupyterhub.objects import Hub from jupyterhub.objects import Server from jupyterhub.orm import Spawner -from kubernetes.client import V1Namespace -from kubernetes.client.models import V1Capabilities -from kubernetes.client.models import V1Container -from kubernetes.client.models import V1Pod -from kubernetes.client.models import V1SecurityContext -from kubernetes.config import load_kube_config +from kubernetes_asyncio.client import V1Namespace +from kubernetes_asyncio.client.models import V1Capabilities +from kubernetes_asyncio.client.models import V1Container +from kubernetes_asyncio.client.models import V1Pod +from kubernetes_asyncio.client.models import V1SecurityContext +from kubernetes_asyncio.config import load_kube_config from traitlets.config import Config from kubespawner import KubeSpawner @@ -62,20 +62,18 @@ async def test_multi_namespace_spawn(): # get a client kube_ns = spawner.namespace - load_kube_config() client = shared_client('CoreV1Api') # the spawner will create the namespace on its own. # Wrap in a try block so we clean up the namespace. - saved_exception = None try: # start the spawner await spawner.start() # verify the pod exists - pods = client.list_namespaced_pod(kube_ns).items + pods = (await client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert "jupyter-%s" % spawner.user.name in pod_names # verify poll while running @@ -84,15 +82,11 @@ async def test_multi_namespace_spawn(): # stop the pod await spawner.stop() # verify pod is gone - pods = client.list_namespaced_pod(kube_ns).items + pods = (await client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert "jupyter-%s" % spawner.user.name not in pod_names # verify exit status status = await spawner.poll() assert isinstance(status, int) - except Exception as saved_exception: - pass # We will raise after namespace removal - # remove namespace - client.delete_namespace(kube_ns, body={}) - if saved_exception is not None: - raise saved_exception + finally: + await client.delete_namespace(kube_ns, body={}) diff --git a/tests/test_objects.py b/tests/test_objects.py index 8e7310e6..dcaab1b3 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -2,7 +2,7 @@ Test functions used to create k8s objects """ import pytest -from kubernetes.client import ApiClient +from kubernetes_asyncio.client import ApiClient from kubespawner.objects import make_ingress from kubespawner.objects import make_namespace diff --git a/tests/test_spawner.py b/tests/test_spawner.py index bf2d04bf..fb2063be 100644 --- a/tests/test_spawner.py +++ b/tests/test_spawner.py @@ -1,3 +1,4 @@ +import asyncio import json import os import time @@ -7,11 +8,11 @@ from jupyterhub.objects import Hub from jupyterhub.objects import Server from jupyterhub.orm import Spawner -from kubernetes.client.models import V1Capabilities -from kubernetes.client.models import V1Container -from kubernetes.client.models import V1PersistentVolumeClaim -from kubernetes.client.models import V1Pod -from kubernetes.client.models import V1SecurityContext +from kubernetes_asyncio.client.models import V1Capabilities +from kubernetes_asyncio.client.models import V1Container +from kubernetes_asyncio.client.models import V1PersistentVolumeClaim +from kubernetes_asyncio.client.models import V1Pod +from kubernetes_asyncio.client.models import V1SecurityContext from traitlets.config import Config from kubespawner import KubeSpawner @@ -122,8 +123,8 @@ def check_up(url, ssl_ca=None, ssl_client_cert=None, ssl_client_key=None): Uses stdlib only because requests isn't always available in the target pod """ - from urllib import request import ssl + from urllib import request if ssl_ca: context = ssl.create_default_context( @@ -174,12 +175,12 @@ async def test_spawn_start( url = await spawner.start() # verify the pod exists - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert pod_name in pod_names # pod should be running when start returns - pod = kube_client.read_namespaced_pod(namespace=kube_ns, name=pod_name) + pod = await kube_client.read_namespaced_pod(namespace=kube_ns, name=pod_name) assert pod.status.phase == "Running" # verify poll while running @@ -187,14 +188,14 @@ async def test_spawn_start( assert status is None # make sure spawn url is correct - r = exec_python(check_up, {"url": url}, _retries=3) + r = await exec_python(check_up, {"url": url}, _retries=3) assert r == "302" # stop the pod await spawner.stop() # verify pod is gone - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert pod_name not in pod_names @@ -234,7 +235,7 @@ async def test_spawn_internal_ssl( url = await spawner.start() pod_name = "jupyter-%s" % spawner.user.name # verify the pod exists - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert pod_name in pod_names # verify poll while running @@ -243,12 +244,12 @@ async def test_spawn_internal_ssl( # verify service and secret exist secret_name = spawner.secret_name - secrets = kube_client.list_namespaced_secret(kube_ns).items + secrets = (await kube_client.list_namespaced_secret(kube_ns)).items secret_names = [s.metadata.name for s in secrets] assert secret_name in secret_names service_name = pod_name - services = kube_client.list_namespaced_service(kube_ns).items + services = (await kube_client.list_namespaced_service(kube_ns)).items service_names = [s.metadata.name for s in services] assert service_name in service_names @@ -262,7 +263,7 @@ async def test_spawn_internal_ssl( hub_internal_cert = os.path.join(hub_internal, "hub-internal.crt") hub_internal_key = os.path.join(hub_internal, "hub-internal.key") - r = exec_python_pod( + r = await exec_python_pod( hub_pod_name, check_up, { @@ -279,20 +280,20 @@ async def test_spawn_internal_ssl( await spawner.stop() # verify pod is gone - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert "jupyter-%s" % spawner.user.name not in pod_names # verify service and secret are gone # it may take a little while for them to get cleaned up for i in range(5): - secrets = kube_client.list_namespaced_secret(kube_ns).items + secrets = (await kube_client.list_namespaced_secret(kube_ns)).items secret_names = {s.metadata.name for s in secrets} - services = kube_client.list_namespaced_service(kube_ns).items + services = (await kube_client.list_namespaced_service(kube_ns)).items service_names = {s.metadata.name for s in services} if secret_name in secret_names or service_name in service_names: - time.sleep(1) + await asyncio.sleep(1) else: break assert secret_name not in secret_names @@ -777,13 +778,15 @@ async def test_delete_pvc(kube_ns, kube_client, hub, config): # verify the pod exists pod_name = "jupyter-%s" % spawner.user.name - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert pod_name in pod_names # verify PVC is created pvc_name = spawner.pvc_name - pvc_list = kube_client.list_namespaced_persistent_volume_claim(kube_ns).items + pvc_list = ( + await kube_client.list_namespaced_persistent_volume_claim(kube_ns) + ).items pvc_names = [s.metadata.name for s in pvc_list] assert pvc_name in pvc_names @@ -791,7 +794,7 @@ async def test_delete_pvc(kube_ns, kube_client, hub, config): await spawner.stop() # verify pod is gone - pods = kube_client.list_namespaced_pod(kube_ns).items + pods = (await kube_client.list_namespaced_pod(kube_ns)).items pod_names = [p.metadata.name for p in pods] assert "jupyter-%s" % spawner.user.name not in pod_names @@ -800,10 +803,12 @@ async def test_delete_pvc(kube_ns, kube_client, hub, config): # verify PVC is deleted, it may take a little while for i in range(5): - pvc_list = kube_client.list_namespaced_persistent_volume_claim(kube_ns).items + pvc_list = ( + await kube_client.list_namespaced_persistent_volume_claim(kube_ns) + ).items pvc_names = [s.metadata.name for s in pvc_list] if pvc_name in pvc_names: - time.sleep(1) + await asyncio.sleep(1) else: break assert pvc_name not in pvc_names diff --git a/tests/test_utils.py b/tests/test_utils.py index 129b1868..845119c4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,11 +2,11 @@ import pytest from conftest import ExecError -from kubernetes.client.models import V1Capabilities -from kubernetes.client.models import V1Container -from kubernetes.client.models import V1Lifecycle -from kubernetes.client.models import V1PodSpec -from kubernetes.client.models import V1SecurityContext +from kubernetes_asyncio.client.models import V1Capabilities +from kubernetes_asyncio.client.models import V1Container +from kubernetes_asyncio.client.models import V1Lifecycle +from kubernetes_asyncio.client.models import V1PodSpec +from kubernetes_asyncio.client.models import V1SecurityContext from kubespawner.utils import _get_k8s_model_attribute from kubespawner.utils import get_k8s_model @@ -33,16 +33,16 @@ def exec_error(): 1 / 0 -def test_exec(exec_python): +async def test_exec(exec_python): """Test the exec fixture itself""" - r = exec_python(print_hello) + r = await exec_python(print_hello) print("result: %r" % r) -def test_exec_error(exec_python): +async def test_exec_error(exec_python): """Test the exec fixture error handling""" with pytest.raises(ExecError) as e: - exec_python(exec_error) + await exec_python(exec_error) def test__get_k8s_model_attribute():