diff --git a/kubespawner/reflector.py b/kubespawner/reflector.py index f209c5ce..daf8bcb4 100644 --- a/kubespawner/reflector.py +++ b/kubespawner/reflector.py @@ -2,6 +2,8 @@ # asyncio Futures cannot be used across threads from concurrent.futures import Future +from functools import partial +import json import time import threading @@ -155,11 +157,13 @@ def _list_and_update(self): label_selector=self.label_selector, field_selector=self.field_selector, _request_timeout=self.request_timeout, + _preload_content=False, ) # This is an atomic operation on the dictionary! - self.resources = {p.metadata.name: p for p in initial_resources.items} + initial_resources = json.loads(initial_resources.read()) + self.resources = {p["metadata"]["name"]: p for p in initial_resources["items"]} # return the resource version so we can hook up a watch - return initial_resources.metadata.resource_version + return initial_resources["metadata"]["resourceVersion"] def _watch_and_update(self): """ @@ -219,10 +223,11 @@ def _watch_and_update(self): if self.timeout_seconds: # set watch timeout watch_args['timeout_seconds'] = self.timeout_seconds + method = partial(getattr(self.api, self.list_method_name), _preload_content=False) # in case of timeout_seconds, the w.stream just exits (no exception thrown) # -> we stop the watcher and start a new one for watch_event in w.stream( - getattr(self.api, self.list_method_name), + method, **watch_args ): # Remember that these events are k8s api related WatchEvents @@ -236,10 +241,10 @@ def _watch_and_update(self): resource = watch_event['object'] if watch_event['type'] == 'DELETED': # This is an atomic delete operation on the dictionary! - self.resources.pop(resource.metadata.name, None) + self.resources.pop(resource["metadata"]["name"], None) else: # This is an atomic operation on the dictionary! - self.resources[resource.metadata.name] = resource + self.resources[resource["metadata"]["name"]] = resource if self._stop_event.is_set(): self.log.info("%s watcher stopped", self.kind) break diff --git a/kubespawner/spawner.py b/kubespawner/spawner.py index 8e3e803e..4f108275 100644 --- a/kubespawner/spawner.py +++ b/kubespawner/spawner.py @@ -66,9 +66,9 @@ class PodReflector(NamespacedResourceReflector): @property def pods(self): """ - A dictionary of the python kubernetes client's representation of pods - for the namespace. The dictionary keys are the pod ids and the values - are the actual pod resource representations. + A dictionary of pods for the namespace as returned by the Kubernetes + API. The dictionary keys are the pod ids and the values are + dictionaries of the actual pod resource values. ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#pod-v1-core """ @@ -88,7 +88,7 @@ class EventReflector(NamespacedResourceReflector): @property def events(self): """ - Returns list of the python kubernetes client's representation of k8s + Returns list of dictionaries representing the k8s events within the namespace, sorted by the latest event. ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#event-v1-core @@ -101,12 +101,13 @@ def events(self): # suddenly refreshes itself entirely. We should not assume a call to # this dictionary's values will result in a consistently ordered list, # so we sort it to get it somewhat more structured. - # - We either seem to get only event.last_timestamp or event.event_time, - # both fields serve the same role but the former is a low resolution - # timestamp without and the other is a higher resolution timestamp. + # - We either seem to get only event['lastTimestamp'] or + # event['eventTime'], both fields serve the same role but the former + # is a low resolution timestamp without and the other is a higher + # resolution timestamp. return sorted( self.resources.values(), - key=lambda event: event.last_timestamp or event.event_time, + key=lambda event: event["lastTimestamp"] or event["eventTime"], ) @@ -1492,10 +1493,10 @@ def is_pod_running(self, pod): # FIXME: Validate if this is really the best way is_running = ( pod is not None and - pod.status.phase == 'Running' and - pod.status.pod_ip is not None and - pod.metadata.deletion_timestamp is None and - all([cs.ready for cs in pod.status.container_statuses]) + pod["status"]["phase"] == 'Running' and + pod["status"]["podIP"] is not None and + "deletionTimestamp" not in pod["metadata"] and + all([cs["ready"] for cs in pod["status"]["containerStatuses"]]) ) return is_running @@ -1559,20 +1560,20 @@ def poll(self): yield self.pod_reflector.first_load_future data = self.pod_reflector.pods.get(self.pod_name, None) if data is not None: - if data.status.phase == 'Pending': + if data["status"]["phase"] == 'Pending': return None - ctr_stat = data.status.container_statuses + ctr_stat = data["status"]["containerStatuses"] if ctr_stat is None: # No status, no container (we hope) # This seems to happen when a pod is idle-culled. return 1 for c in ctr_stat: # return exit code if notebook container has terminated - if c.name == 'notebook': - if c.state.terminated: + if c["name"] == 'notebook': + if "terminated" in c["state"]: # call self.stop to delete the pod if self.delete_stopped_pods: yield self.stop(now=True) - return c.state.terminated.exit_code + return c["state"]["terminated"]["exitCode"] break # None means pod is running or starting up return None @@ -1596,11 +1597,11 @@ def events(self): events = [] for event in self.event_reflector.events: - if event.involved_object.name != self.pod_name: + if event["involvedObject"]["name"] != self.pod_name: # only consider events for my pod name continue - if self._last_event and event.metadata.uid == self._last_event: + if self._last_event and event["metadata"]["uid"]== self._last_event: # saw last_event marker, ignore any previous events # and only consider future events # only include events *after* our _last_event marker @@ -1642,7 +1643,7 @@ async def progress(self): # pod_id may change if a previous pod is being stopped # before starting a new one # use the uid of the latest event to identify 'current' - pod_id = events[-1].involved_object.uid + pod_id = events[-1]["involvedObject"]["uid"] for i in range(next_event, len_events): event = events[i] # move the progress bar. @@ -1652,20 +1653,13 @@ async def progress(self): # 30 50 63 72 78 82 84 86 87 88 88 89 progress += (90 - progress) / 3 - # V1Event isn't serializable, and neither is the datetime - # objects within it, and we need what we pass back to be - # serializable to it can be sent back from JupyterHub to - # a browser wanting to display progress. - serializable_event = json.loads( - json.dumps(event.to_dict(), default=datetime.isoformat) - ) await yield_({ 'progress': int(progress), - 'raw_event': serializable_event, + 'raw_event': event, 'message': "%s [%s] %s" % ( - event.last_timestamp or event.event_time, - event.type, - event.message, + event["lastTimestamp"] or event["eventTime"], + event["type"], + event["message"], ) }) next_event = len_events @@ -1769,7 +1763,7 @@ def _start(self): # pod if it's part of this spawn process events = self.events if events: - self._last_event = events[-1].metadata.uid + self._last_event = events[-1]["metadata"]["uid"] if self.storage_pvc_ensure: # Try and create the pvc. If it succeeds we are good. If @@ -1863,19 +1857,19 @@ def _start(self): raise pod = self.pod_reflector.pods[self.pod_name] - self.pod_id = pod.metadata.uid + self.pod_id = pod["metadata"]["uid"] if self.event_reflector: self.log.debug( 'pod %s events before launch: %s', self.pod_name, "\n".join( [ - "%s [%s] %s" % (event.last_timestamp or event.event_time, event.type, event.message) + "%s [%s] %s" % (event["lastTimestamp"] or event["eventTime"], event["type"], event["message"]) for event in self.events ] ), ) - return (pod.status.pod_ip, self.port) + return (pod["status"]["podIP"], self.port) @gen.coroutine def stop(self, now=False):