Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking change / performance: don't make kubernetes-client deserialize k8s events into objects #424

Merged
merged 3 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# asyncio Futures cannot be used across threads
from concurrent.futures import Future

from functools import partial
import json
import time
import threading

Expand Down Expand Up @@ -155,11 +157,13 @@ def _list_and_update(self):
label_selector=self.label_selector,
field_selector=self.field_selector,
_request_timeout=self.request_timeout,
_preload_content=False,
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
initial_resources = json.loads(initial_resources.read())
self.resources = {p["metadata"]["name"]: p for p in initial_resources["items"]}
Copy link
Contributor

@mriedem mriedem Aug 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this changes the value on the resources dict from an object to a dict so the help here should probably be updated as well:

https://github.com/jupyterhub/kubespawner/pull/424/files#diff-2f4d4efedf466e7bf4f8f083d0c722adR52

One thing I'm not sure about is how much this might impact out-of-tree reflectors (assuming those exist, which I don't know if that's supported) that are expecting self.resources to return objects rather than dicts, i.e. this could be a breaking change for them. One could maybe mitigate that by wrapping the dicts in simple objects that pass through __getattr__ to __getitem__ but as we can see below with resource_version we'd also have to camel-case-ify some of the attributes, like this.

I'm not sure what the ABI contract on something like this is within the KubeSpawner. Would a release note be sufficient?

# return the resource version so we can hook up a watch
return initial_resources.metadata.resource_version
return initial_resources["metadata"]["resourceVersion"]

def _watch_and_update(self):
"""
Expand Down Expand Up @@ -219,10 +223,11 @@ def _watch_and_update(self):
if self.timeout_seconds:
# set watch timeout
watch_args['timeout_seconds'] = self.timeout_seconds
method = partial(getattr(self.api, self.list_method_name), _preload_content=False)
# in case of timeout_seconds, the w.stream just exits (no exception thrown)
# -> we stop the watcher and start a new one
for watch_event in w.stream(
getattr(self.api, self.list_method_name),
method,
**watch_args
):
# Remember that these events are k8s api related WatchEvents
Expand All @@ -236,10 +241,10 @@ def _watch_and_update(self):
resource = watch_event['object']
if watch_event['type'] == 'DELETED':
# This is an atomic delete operation on the dictionary!
self.resources.pop(resource.metadata.name, None)
self.resources.pop(resource["metadata"]["name"], None)
else:
# This is an atomic operation on the dictionary!
self.resources[resource.metadata.name] = resource
self.resources[resource["metadata"]["name"]] = resource
if self._stop_event.is_set():
self.log.info("%s watcher stopped", self.kind)
break
Expand Down
64 changes: 29 additions & 35 deletions kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class PodReflector(NamespacedResourceReflector):
@property
def pods(self):
"""
A dictionary of the python kubernetes client's representation of pods
for the namespace. The dictionary keys are the pod ids and the values
are the actual pod resource representations.
A dictionary of pods for the namespace as returned by the Kubernetes
API. The dictionary keys are the pod ids and the values are
dictionaries of the actual pod resource values.

ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#pod-v1-core
"""
Expand All @@ -88,7 +88,7 @@ class EventReflector(NamespacedResourceReflector):
@property
def events(self):
"""
Returns list of the python kubernetes client's representation of k8s
Returns list of dictionaries representing the k8s
events within the namespace, sorted by the latest event.

ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#event-v1-core
Expand All @@ -101,12 +101,13 @@ def events(self):
# suddenly refreshes itself entirely. We should not assume a call to
# this dictionary's values will result in a consistently ordered list,
# so we sort it to get it somewhat more structured.
# - We either seem to get only event.last_timestamp or event.event_time,
# both fields serve the same role but the former is a low resolution
# timestamp without and the other is a higher resolution timestamp.
# - We either seem to get only event['lastTimestamp'] or
# event['eventTime'], both fields serve the same role but the former
# is a low resolution timestamp without and the other is a higher
# resolution timestamp.
return sorted(
self.resources.values(),
key=lambda event: event.last_timestamp or event.event_time,
key=lambda event: event["lastTimestamp"] or event["eventTime"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of the kind of issue I'm talking about above. Handling this for the known event and pod reflectors within this repo is sufficient but I worry about breaking out of tree reflectors, but like I said I also don't know what contract is there.

)


Expand Down Expand Up @@ -1492,10 +1493,10 @@ def is_pod_running(self, pod):
# FIXME: Validate if this is really the best way
is_running = (
pod is not None and
pod.status.phase == 'Running' and
pod.status.pod_ip is not None and
pod.metadata.deletion_timestamp is None and
all([cs.ready for cs in pod.status.container_statuses])
pod["status"]["phase"] == 'Running' and
pod["status"]["podIP"] is not None and
"deletionTimestamp" not in pod["metadata"] and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe this should be:

not pod["metadata"].get("deletionTimestamp")

Because deletionTimestamp not being in the dict vs being in the dict with a None value are different things, unless that's not possible with how the kube API works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletionTimestamp is only set by the server when a deletion is requested. It shouldn't exist at any other time.

all([cs["ready"] for cs in pod["status"]["containerStatuses"]])
)
return is_running

Expand Down Expand Up @@ -1559,20 +1560,20 @@ def poll(self):
yield self.pod_reflector.first_load_future
data = self.pod_reflector.pods.get(self.pod_name, None)
rmoe marked this conversation as resolved.
Show resolved Hide resolved
if data is not None:
if data.status.phase == 'Pending':
if data["status"]["phase"] == 'Pending':
return None
ctr_stat = data.status.container_statuses
ctr_stat = data["status"]["containerStatuses"]
if ctr_stat is None: # No status, no container (we hope)
# This seems to happen when a pod is idle-culled.
return 1
for c in ctr_stat:
# return exit code if notebook container has terminated
if c.name == 'notebook':
if c.state.terminated:
if c["name"] == 'notebook':
if "terminated" in c["state"]:
# call self.stop to delete the pod
if self.delete_stopped_pods:
yield self.stop(now=True)
return c.state.terminated.exit_code
return c["state"]["terminated"]["exitCode"]
break
# None means pod is running or starting up
return None
Expand All @@ -1596,11 +1597,11 @@ def events(self):

events = []
for event in self.event_reflector.events:
if event.involved_object.name != self.pod_name:
if event["involvedObject"]["name"] != self.pod_name:
# only consider events for my pod name
continue

if self._last_event and event.metadata.uid == self._last_event:
if self._last_event and event["metadata"]["uid"]== self._last_event:
# saw last_event marker, ignore any previous events
# and only consider future events
# only include events *after* our _last_event marker
Expand Down Expand Up @@ -1642,7 +1643,7 @@ async def progress(self):
# pod_id may change if a previous pod is being stopped
# before starting a new one
# use the uid of the latest event to identify 'current'
pod_id = events[-1].involved_object.uid
pod_id = events[-1]["involvedObject"]["uid"]
for i in range(next_event, len_events):
event = events[i]
# move the progress bar.
Expand All @@ -1652,20 +1653,13 @@ async def progress(self):
# 30 50 63 72 78 82 84 86 87 88 88 89
progress += (90 - progress) / 3

# V1Event isn't serializable, and neither is the datetime
# objects within it, and we need what we pass back to be
# serializable to it can be sent back from JupyterHub to
# a browser wanting to display progress.
serializable_event = json.loads(
json.dumps(event.to_dict(), default=datetime.isoformat)
)
await yield_({
'progress': int(progress),
'raw_event': serializable_event,
'raw_event': event,
'message': "%s [%s] %s" % (
event.last_timestamp or event.event_time,
event.type,
event.message,
event["lastTimestamp"] or event["eventTime"],
event["type"],
event["message"],
)
})
next_event = len_events
Expand Down Expand Up @@ -1769,7 +1763,7 @@ def _start(self):
# pod if it's part of this spawn process
events = self.events
if events:
self._last_event = events[-1].metadata.uid
self._last_event = events[-1]["metadata"]["uid"]

if self.storage_pvc_ensure:
# Try and create the pvc. If it succeeds we are good. If
Expand Down Expand Up @@ -1863,19 +1857,19 @@ def _start(self):
raise

pod = self.pod_reflector.pods[self.pod_name]
self.pod_id = pod.metadata.uid
self.pod_id = pod["metadata"]["uid"]
if self.event_reflector:
self.log.debug(
'pod %s events before launch: %s',
self.pod_name,
"\n".join(
[
"%s [%s] %s" % (event.last_timestamp or event.event_time, event.type, event.message)
"%s [%s] %s" % (event["lastTimestamp"] or event["eventTime"], event["type"], event["message"])
for event in self.events
]
),
)
return (pod.status.pod_ip, self.port)
return (pod["status"]["podIP"], self.port)

@gen.coroutine
def stop(self, now=False):
Expand Down