Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progress messages for jupyterhub 0.9 #153

Merged
merged 10 commits into from
May 24, 2018
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ python:
- nightly
- 3.6
- 3.5
- 3.4

# install dependencies
install:
Expand Down
30 changes: 28 additions & 2 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class NamespacedResourceReflector(LoggingConfigurable):
"""
)

fields = Dict(
{},
config=True,
help="""
Fields to restrict the reflected objects
"""
)

namespace = Unicode(
None,
allow_none=True,
Expand Down Expand Up @@ -88,11 +96,16 @@ def __init__(self, *args, **kwargs):

# FIXME: Protect against malicious labels?
self.label_selector = ','.join(['{}={}'.format(k, v) for k, v in self.labels.items()])
self.field_selector = ','.join(['{}={}'.format(k, v) for k, v in self.fields.items()])

self.first_load_future = Future()
self._stop_event = threading.Event()

self.start()

def __del__(self):
self.stop()

def _list_and_update(self):
"""
Update current list of resources by doing a full fetch.
Expand All @@ -101,7 +114,8 @@ def _list_and_update(self):
"""
initial_resources = getattr(self.api, self.list_method_name)(
self.namespace,
label_selector=self.label_selector
label_selector=self.label_selector,
field_selector=self.field_selector
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
Expand Down Expand Up @@ -132,8 +146,8 @@ def _watch_and_update(self):
update' cycle on them), we should be ok!
"""
cur_delay = 0.1
self.log.info("watching for %s with label selector %s / field selector %s in namespace %s", self.kind, self.label_selector, self.field_selector, self.namespace)
while True:
self.log.info("watching for %s with label selector %s in namespace %s", self.kind, self.label_selector, self.namespace)
w = watch.Watch()
try:
resource_version = self._list_and_update()
Expand All @@ -144,7 +158,9 @@ def _watch_and_update(self):
getattr(self.api, self.list_method_name),
self.namespace,
label_selector=self.label_selector,
field_selector=self.field_selector,
resource_version=resource_version,
timeout_seconds=10,
):
cur_delay = 0.1
resource = ev['object']
Expand All @@ -154,6 +170,9 @@ def _watch_and_update(self):
else:
# This is an atomic operation on the dictionary!
self.resources[resource.metadata.name] = resource
if self._stop_event.is_set():
break

except Exception:
cur_delay = cur_delay * 2
if cur_delay > 30:
Expand All @@ -166,6 +185,9 @@ def _watch_and_update(self):
continue
finally:
w.stop()
if self._stop_event.is_set():
self.log.info("%s watcher stopped", self.kind)
break

def start(self):
"""
Expand All @@ -186,4 +208,8 @@ def start(self):
self.watch_thread.daemon = True
self.watch_thread.start()

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()
48 changes: 47 additions & 1 deletion kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from kubespawner.utils import Callable
from kubespawner.objects import make_pod, make_pvc
from kubespawner.reflector import NamespacedResourceReflector

from asyncio import sleep
from async_generator import async_generator, yield_

class PodReflector(NamespacedResourceReflector):
kind = 'pods'
Expand All @@ -44,6 +45,15 @@ class PodReflector(NamespacedResourceReflector):
def pods(self):
return self.resources

class EventReflector(NamespacedResourceReflector):
kind = 'events'

list_method_name = 'list_namespaced_event'

@property
def events(self):
return sorted(self.resources.values(), key = lambda x : x.last_timestamp)

class KubeSpawner(Spawner):
"""
Implement a JupyterHub spawner to spawn pods in a Kubernetes Cluster.
Expand Down Expand Up @@ -1112,6 +1122,24 @@ def poll(self):
def asynchronize(self, method, *args, **kwargs):
return method(*args, **kwargs)

@async_generator
async def progress(self):
next_event = 0
self.log.debug('progress generator: %s', self.pod_name)

while not self.events.stopped():
len_events = len(self.events.events)
if next_event < len_events:
for i in range(next_event, len_events):
event = self.events.events[i]
await yield_({
'progress': 50,
'message': "%s [%s] %s" % (event.last_timestamp, event.type, event.message)
})
next_event = len_events
await sleep(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This while loop needs to end at some point. How do you signal that there will be no more events?



@gen.coroutine
def start(self):
if self.user_storage_pvc_ensure:
Expand All @@ -1128,6 +1156,17 @@ def start(self):
else:
raise

main_loop = IOLoop.current()
def on_reflector_failure():
self.log.critical("Events reflector failed, halting Hub.")
main_loop.stop()

# events are selected based on pod name, which will include previous launch/stop
self.events = EventReflector(
parent=self, namespace=self.namespace,
fields={'involvedObject.kind': 'Pod', 'involvedObject.name': self.pod_name},
on_failure=on_reflector_failure
)
# If we run into a 409 Conflict error, it means a pod with the
# same name already exists. We stop it, wait for it to stop, and
# try again. We try 4 times, and if it still fails we give up.
Expand Down Expand Up @@ -1167,6 +1206,13 @@ def start(self):
)

pod = self.pod_reflector.pods[self.pod_name]

self.log.debug('pod %s events before launch: %s',
self.pod_name, "\n".join(["%s [%s] %s" % (event.last_timestamp, event.type, event.message) for event in self.events.events]))

# Note: we stop the event watcher once launch is successful, but the reflector
# will only stop when the next event comes in, likely when it is stopped.
self.events.stop()
return (pod.status.pod_ip, self.port)

@gen.coroutine
Expand Down
10 changes: 10 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from __future__ import print_function
from setuptools import setup, find_packages
import sys

v = sys.version_info
if v[:2] < (3, 5):
error = "ERROR: jupyterhub-kubespawner requires Python version 3.5 or above."
print(error, file=sys.stderr)
sys.exit(1)

setup(
name='jupyterhub-kubespawner',
Expand All @@ -9,7 +17,9 @@
'kubernetes==4.*',
'escapism',
'jinja2',
'async_generator>=1.8',
],
python_requires = ">=3.5",
setup_requires=['pytest-runner'],
tests_require=['pytest'],
description='JupyterHub Spawner targeting Kubernetes',
Expand Down