-
Notifications
You must be signed in to change notification settings - Fork 787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
K8S: auth and pod/job race #704
K8S: auth and pod/job race #704
Conversation
return KubernetesJob(self, **kwargs) | ||
|
||
def get(self): | ||
if time.time() - self._client_refresh_timestamp < CLIENT_REFRESH_INTERVAL_SECONDS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the comparison be ">" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep 🤦♂️
d523528
to
d401db6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some quick comments. I haven't been able to play with this PR live.
# We shouldn't really get here unless the K8S control plane is | ||
# really unhealthy. | ||
echo( | ||
"Pod is not running but the job not is done or failed, last job state: %s" % self._job._job, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, the implementation logic in kubernetes_client.py is bleeding here (pod awareness for example). If you look at the original Kubernetes PR, job.status
encapsulates the reason that can be simply printed out here.
@@ -317,7 +317,7 @@ def _print_available(tail, stream, should_persist=False): | |||
now = time.time() | |||
log_update_delay = update_delay(now - start_time) | |||
next_log_update = now + log_update_delay | |||
is_running = self._job.is_running | |||
is_running = self._job.check_pod_is_running() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reasoning behind changing job.is_running to job.check_pod_is_running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it clear that it checks that the pod is running not the k8s job. The entire issue was that is_running
was checking for the pod status but is_done
was checking for the job status, as a result RunningJob
could be neither "running" nor "done" at the same time.
Actually, now reading your comments on this PR I realize that the "Job" in RunningJob
wasn't intended to mean "Kubernetes Job" but a separate logical concept that kinda abstracts away both k8s jobs and pods. It makes sense now but I wonder if there's a way to make it more clear for the reader; when I encounter a variable named _job
in kubernetes.py
it makes me think of a K8S job object.
@@ -472,8 +496,7 @@ def id(self): | |||
# TODO (savin): Should we use pod id instead? | |||
return self._id | |||
|
|||
@property | |||
def is_done(self): | |||
def check_is_done(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not too much trouble, can we move back to @property
to maintain parity with the @batch
implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is def subjective but when I was debugging this issue, I found that @property
made this code considerably harder to understand, at least for me. When I see self.is_done
I assume that it is either a simple variable read or maybe a thin wrapper over one; where in reality it does pretty complex stuff: calls K8S API and updates the internal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the sentiment - this is just to keep the codebase a bit consistent.
else: | ||
time.sleep(POD_FETCH_BACKOFF_SECONDS) | ||
else: | ||
raise Exception('Could not fetch pod status in %s seconds' % (POD_FETCH_RETRIES * POD_FETCH_BACKOFF_SECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we throw an exception here, does it automatically terminate the k8s job?
# done but the job is still "active". It is more likely to happen | ||
# when the control plane is overloaded, e.g. on minikube/kind. | ||
self._job.wait_done(timeout_seconds=20) | ||
except TimeoutError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it feasible to encapsulate this logic in kubernetes_client.py
? Currently, that module takes care of all the complexity of ascertaining the correct job status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see, I now get your intent that RunningJob
was supposed to abstract away k8s job and pod together
600210e
to
7d701ef
Compare
@savingoyal please take another look, I've added back the properties as an external interface for |
elif not self._job.is_done: | ||
# Kill the job if it is still running by throwing an exception. | ||
raise KubernetesKilledException("Task failed!") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed since it is no longer possible for RunningJob
to be not "running" anymore and not "done" at the same time.
7d701ef
to
ed1dc54
Compare
39193d7
to
fc5f37f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment, but LGTM otherwise!
pod_phase = self._pod.get("status", {}).get("phase") | ||
if pod_phase == "Running": | ||
return True | ||
elif pod_phase in ("Succeeded", "Failed"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, I am missing something - why do we need to wait for 40 seconds for us to mark the job as not running and not immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is about the issue 3 in the PR description, and trying to make RunningJob
state machine simpler.
The core issue here was that we'd rely on pod status to determine "running" and V1Job status to determine "done", and it is technically possible for pod to be "Succeeded" but V1Job would still have "active= 1" for a brief period, before the job controller updates the V1Job metadata. That caused a successful "RunningJob" go through these states:
- not-running, not-done (haven't started yet)
- running, not-done
- not-running, not-done (again!)
- not-running, done
I wanted to get rid of this 3rd state because it is confusing for the caller, sometimes doesn't even happen, and in this case practically this complexity caused this bug. For the caller, it is also indistinguishable from the very first state, which feels like a trap for future us. So in this PR, I've changed it so that if the pod is done but the job isn't, is_running
just kinda wait it out so that next call to is_done
is guaranteed to return True. Or in other words, after the job has started, it is impossible for it to be not-running and not-done at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but we can return False
for _check_is_running
when pod_phase in ("Succeeded", "Failed")
immediately and in _check_is_job_done
we can add a check to for _check_is_running
if the V1Job status is active. That should remove the need for a 40-second poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So something like
def _check_is_job_done(..)
if self._job["active"] and not self._check_is_running():
return True
Effectively "folding" that intermediate state into "done" state instead?
That's an option, I think what's stopped me is that it would complicate is_succeeded
/is_failed
since they wouldn't be able to rely on V1Job status, as it may not be yet up to date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change the logic of is_succeeded/is_failed
to also inspect the status of "pod_phase". We do have this issue where the "pod" itself may disappear depending on how the job was canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point that we need to think through if we are introducing a check for succeeded
and failed
pod phases is what happens when a new pod phase is introduced. We can consider inspecting the PodConditions to have a greater degree of certainty, although it makes reasoning about the code a bit more convoluted. That was one of the reasons why I was only looking at the Running
pod phase in the original implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Soo at the end of the day,
RunningJob.running := V1PodStatus.phase is "Running"
RunningJob.done := V1JobStatus.succeeded > 0 OR V1JobStatus.failed > 0 OR V1PodStatus.phase in ("Succeded", "Failed") OR V1JobSpec.parallelism is 0
RunningJob.succeeded := done AND (V1PodStatus.phase is "Succeeded" OR V1JobStatus.succeeded > 0)
RunningJob.failed := done AND (V1PodStatus.phase is "Failed" OR V1JobSpec.parallelism is 0 OR V1JobStatus.failed > 0)
RunningJob.waiting = NOT (running OR done)
(if pod doesn't exist, V1PodStatus.phase is assumed to be "null" above)
This should ensure that the job goes through waiting -> running -> done
without weird intermediate states, and that a "done" job is always either "succeeded" or "failed".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable to me (need to dust up my notes to find edge cases). This state diagram is pretty much the original implementation with accommodation for eventual consistency of job status? This still doesn't technically ensure that the job goes through waiting -> running -> done
since pod phases can flip unpredictably to unknown
.
Also, as an aside, the implementation currently also tries to optimize for the number of pod and job status API calls, so the order in which these conditions are evaluated is important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep unknown is another thing we may need to revisit. I've updated the implementation to match this logic. I may also add some unit tests in a separate PR as this seems a perfect use case for some unit testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw I have also removed another wait-loop where I'd wait for pod object to appear after the job was created, I think it is not necessary as long as we carefully handle pod-does-not-exist in the state machine logic
b26e3d4
to
037043f
Compare
037043f
to
a7a89de
Compare
* Refactor @resources decorator @resources decorator is shared by all compute related decorators - @Batch, @lambda, @K8s, @titus. This patch moves it out of batch_decorator.py so that other decorators can cleanly reference it. * Update __init__.py * Refactor @Batch decorator * more change * more changes * more changes * @kubernetes * Kubernetes * More changes * More changes * more changes * some more changes * more changes * add disk space * Add todos * some fixes * add k8s testing context * more changes * some more changes * minor fixups * better error handling for evicted pods (#711) * fixes for pod/job metadata race conditions (#704) * K8S: label value sanitizer (#719) * rename name_space to namespace for k8s plugin (#750) * fix k8s attribute handling bug (#753) * tweak k8s test resources (to run on kind) (#754) * add k8s api retries (#756) * update done marker * Use linux binaries in @conda when run in k8s (#758) Conda environment should pack linux python binary when run on MacOS to avoid an error metaflow_PlayListFlow_osx-64_179c56284704ca8e53622f848a3df27cdd1f4327/bin/python: cannot execute binary file: Exec format error * fix comment * fix merge conflict * update char Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> Co-authored-by: Roman Kindruk <36699371+sappier@users.noreply.github.com>
This PR has a couple fixes as a result of running our internal test suite:
I have also slightly refactored the
kubernetes_client.py
for readability (obviously this is very subjective).