From 39193d7cb2b91793cf6a0a60bbe60f35708c8281 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Thu, 16 Sep 2021 17:22:59 -0700 Subject: [PATCH] fixes for pod/job metadata race conditions --- metaflow/plugins/aws/eks/kubernetes.py | 4 +- metaflow/plugins/aws/eks/kubernetes_client.py | 213 ++++++++++++------ 2 files changed, 148 insertions(+), 69 deletions(-) diff --git a/metaflow/plugins/aws/eks/kubernetes.py b/metaflow/plugins/aws/eks/kubernetes.py index f1acfb30fd4..8c2a945b0c0 100644 --- a/metaflow/plugins/aws/eks/kubernetes.py +++ b/metaflow/plugins/aws/eks/kubernetes.py @@ -358,9 +358,7 @@ def _print_available(tail, stream, should_persist=False): "%s. This could be a transient error. " "Use @retry to retry." % msg ) - elif not self._job.is_done: - # Kill the job if it is still running by throwing an exception. - raise KubernetesKilledException("Task failed!") + exit_code, _ = self._job.reason echo( "Task finished with exit code %s." % exit_code, diff --git a/metaflow/plugins/aws/eks/kubernetes_client.py b/metaflow/plugins/aws/eks/kubernetes_client.py index 2e7cd0a3e96..25041b3e29c 100644 --- a/metaflow/plugins/aws/eks/kubernetes_client.py +++ b/metaflow/plugins/aws/eks/kubernetes_client.py @@ -1,4 +1,5 @@ import os +import time try: unicode @@ -8,6 +9,12 @@ from metaflow.exception import MetaflowException +# The pod object may not be created immediately after submitting the job, +# we need to have some tolerance here when fetching the pod. +POD_FETCH_BACKOFF_SECONDS = 1.0 +POD_FETCH_RETRIES = 20 + +CLIENT_REFRESH_INTERVAL_SECONDS = 300 class KubernetesJobException(MetaflowException): headline = "Kubernetes job error" @@ -28,6 +35,10 @@ def __init__(self): "Could not import module 'kubernetes'. Install kubernetes " "Python package (https://pypi.org/project/kubernetes/) first." ) + self._refresh_client() + + def _refresh_client(self): + from kubernetes import client, config if os.getenv("KUBERNETES_SERVICE_HOST"): # We’re inside a pod, authenticate via ServiceAccount assigned to us config.load_incluster_config() @@ -40,14 +51,21 @@ def __init__(self): # good enough for the initial rollout. config.load_kube_config() self._client = client + self._client_refresh_timestamp = time.time() def job(self, **kwargs): - return KubernetesJob(self._client, **kwargs) + return KubernetesJob(self, **kwargs) + + def get(self): + if time.time() - self._client_refresh_timestamp > CLIENT_REFRESH_INTERVAL_SECONDS: + self._refresh_client() + + return self._client class KubernetesJob(object): - def __init__(self, client, **kwargs): - self._client = client + def __init__(self, client_wrapper, **kwargs): + self._client_wrapper = client_wrapper self._kwargs = kwargs # Kubernetes namespace defaults to `default` @@ -104,10 +122,11 @@ def create(self): # # Note: This implementation ensures that there is only one unique Pod # (unique UID) per Metaflow task attempt. - self._job = self._client.V1Job( + client = self._client_wrapper.get() + self._job = client.V1Job( api_version="batch/v1", kind="Job", - metadata=self._client.V1ObjectMeta( + metadata=client.V1ObjectMeta( # Annotations are for humans annotations=self._kwargs.get("annotations", {}), # While labels are for Kubernetes @@ -115,7 +134,7 @@ def create(self): name=self._kwargs["name"], # Unique within the namespace namespace=self._kwargs["namespace"], # Defaults to `default` ), - spec=self._client.V1JobSpec( + spec=client.V1JobSpec( # Retries are handled by Metaflow when it is responsible for # executing the flow. The responsibility is moved to Kubernetes # when AWS Step Functions / Argo are responsible for the @@ -128,14 +147,14 @@ def create(self): * 60 * 60 # Remove job after a week. TODO (savin): Make this * 24, # configurable - template=self._client.V1PodTemplateSpec( - metadata=self._client.V1ObjectMeta( + template=client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta( annotations=self._kwargs.get("annotations", {}), labels=self._kwargs.get("labels", {}), name=self._kwargs["name"], namespace=self._kwargs["namespace"], ), - spec=self._client.V1PodSpec( + spec=client.V1PodSpec( # Timeout is set on the pod and not the job (important!) active_deadline_seconds=self._kwargs[ "timeout_in_seconds" @@ -148,10 +167,10 @@ def create(self): # roll out. # affinity=?, containers=[ - self._client.V1Container( + client.V1Container( command=self._kwargs["command"], env=[ - self._client.V1EnvVar(name=k, value=str(v)) + client.V1EnvVar(name=k, value=str(v)) for k, v in self._kwargs.get( "environment_variables", {} ).items() @@ -163,10 +182,10 @@ def create(self): # TODO: Figure out a way to make job # metadata visible within the container + [ - self._client.V1EnvVar( + client.V1EnvVar( name=k, - value_from=self._client.V1EnvVarSource( - field_ref=self._client.V1ObjectFieldSelector( + value_from=client.V1EnvVarSource( + field_ref=client.V1ObjectFieldSelector( field_path=str(v) ) ), @@ -178,8 +197,8 @@ def create(self): }.items() ], env_from=[ - self._client.V1EnvFromSource( - secret_ref=self._client.V1SecretEnvSource( + client.V1EnvFromSource( + secret_ref=client.V1SecretEnvSource( name=str(k) ) ) @@ -187,7 +206,7 @@ def create(self): ], image=self._kwargs["image"], name=self._kwargs["name"], - resources=self._client.V1ResourceRequirements( + resources=client.V1ResourceRequirements( requests={ "cpu": str(self._kwargs["cpu"]), "memory": "%sM" @@ -239,25 +258,26 @@ def create(self): return self def execute(self): + client = self._client_wrapper.get() try: # TODO (savin): Make job submission back-pressure aware. Currently # there doesn't seem to be a kubernetes-native way to # achieve the guarantees that we are seeking. # Hopefully, we will be able to get creative soon. response = ( - self._client.BatchV1Api() + client.BatchV1Api() .create_namespaced_job( body=self._job, namespace=self._kwargs["namespace"] ) .to_dict() ) return RunningJob( - client=self._client, + client_wrapper=self._client_wrapper, name=response["metadata"]["name"], uid=response["metadata"]["uid"], namespace=response["metadata"]["namespace"], ) - except self._client.rest.ApiException as e: + except client.rest.ApiException as e: raise KubernetesJobException( "Unable to launch Kubernetes job.\n %s" % str(e) ) @@ -309,6 +329,22 @@ class RunningJob(object): # State Machine implementation for the lifecycle behavior documented in # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + # + # This object encapsulates *both* V1Job and V1Pod. It simplifies the status + # to "running" and "done" (failed/succeeded) state. Note that V1Job and V1Pod + # status fields are not guaranteed to be always in sync due to the way job + # controller works. + # + # For example, for a successful job, RunningJob states and their corresponding + # K8S object states look like this: + # + # | V1JobStatus.active | V1JobStatus.succeeded | V1PodStatus.phase | RunningJob.is_running | RunningJob.is_done | + # |--------------------|-----------------------|-------------------|-----------------------|--------------------| + # | 0 | 0 | N/A | False | False | + # | 0 | 0 | Pending | False | False | + # | 1 | 0 | Running | True | False | + # | 1 | 0 | Succeeded | True | False | + # | 0 | 1 | Succeeded | False | True | # To ascertain the status of V1Job, we peer into the lifecycle status of # the pod it is responsible for executing. Unfortunately, the `phase` @@ -348,8 +384,8 @@ class RunningJob(object): JOB_ACTIVE = "job:active" JOB_FAILED = "" - def __init__(self, client, name, uid, namespace): - self._client = client + def __init__(self, client_wrapper, name, uid, namespace): + self._client_wrapper = client_wrapper self._name = name self._id = uid self._namespace = namespace @@ -367,31 +403,35 @@ def __repr__(self): ) def _fetch_job(self): + client = self._client_wrapper.get() try: return ( - self._client.BatchV1Api() + client.BatchV1Api() .read_namespaced_job(name=self._name, namespace=self._namespace) .to_dict() ) - except self._client.rest.ApiException as e: + except client.rest.ApiException as e: # TODO: Handle failures as well as the fact that a different # process can delete the job. raise e def _fetch_pod(self): + client = self._client_wrapper.get() try: - # TODO (savin): pods may not appear immediately or they may - # disappear - return ( - self._client.CoreV1Api() - .list_namespaced_pod( - namespace=self._namespace, - label_selector="job-name={}".format(self._name), - ) - .to_dict()["items"] - or [None] - )[0] - except self._client.rest.ApiException as e: + # Pod objects may not get created immediately after job submission + for _ in range(POD_FETCH_RETRIES): + pods = client.CoreV1Api().list_namespaced_pod( + namespace=self._namespace, + label_selector="job-name={}".format(self._name), + ).to_dict()["items"] + + if pods: + return pods[0] + else: + time.sleep(POD_FETCH_BACKOFF_SECONDS) + else: + raise KubernetesJobException('Could not fetch pod status in %s seconds' % (POD_FETCH_RETRIES * POD_FETCH_BACKOFF_SECONDS)) + except client.rest.ApiException as e: # TODO: Handle failures raise e @@ -418,12 +458,12 @@ def kill(self): # terminate the pod without deleting the object. # 3. If the pod object hasn't shown up yet, we set the parallelism to 0 # to preempt it. - if not self.is_done: - if self.is_running: + client = self._client_wrapper.get() + if not self._check_is_job_done(): + if self._check_is_running(): # Case 1. from kubernetes.stream import stream - - api_instance = self._client.CoreV1Api + api_instance = client.CoreV1Api try: # TODO (savin): stream opens a web-socket connection. It may # not be desirable to open multiple web-socket @@ -454,7 +494,7 @@ def kill(self): try: # TODO (savin): Also patch job annotation to reflect this # action. - self._client.BatchV1Api().patch_namespaced_job( + client.BatchV1Api().patch_namespaced_job( name=self._name, namespace=self._namespace, field_manager="metaflow", @@ -472,8 +512,7 @@ def id(self): # TODO (savin): Should we use pod id instead? return self._id - @property - def is_done(self): + def _check_is_job_done(self): def _done(): # Either the job succeeds or fails naturally or we may have # forced the pod termination causing the job to still be in an @@ -494,10 +533,20 @@ def _done(): self._job = self._fetch_job() return _done() - @property - def status(self): - if not self.is_done: - # If not done, check for newer status + + def _wait_job_done(self, timeout_seconds): + deadline = time.time() + timeout_seconds + while time.time() < deadline: + if self._check_is_job_done(): + return True + else: + time.sleep(1) + raise TimeoutError("Timed out while waiting for Job to become done") + + + def _get_status(self): + if not self._check_is_job_done(): + # If not done, check for newer pod status self._pod = self._fetch_pod() # Success! if bool(self._job["status"].get("succeeded")): @@ -534,38 +583,50 @@ def status(self): return msg return "Job:Unknown" - @property - def has_succeeded(self): + def _check_has_succeeded(self): # Job is in a terminal state and the status is marked as succeeded - return self.is_done and bool(self._job["status"].get("succeeded")) + return self._check_is_job_done() and bool(self._job["status"].get("succeeded")) - @property - def has_failed(self): + def _check_has_failed(self): # Job is in a terminal state and either the status is marked as failed # or the Job is not allowed to launch any more pods - return self.is_done and ( + return self._check_is_job_done() and ( bool(self._job["status"].get("failed")) or (self._job["spec"]["parallelism"] == 0) ) - @property - def is_running(self): - # The container is running. This happens when the Pod's phase is running - if not self.is_done: + def _check_is_running(self): + # Returns true if the container is running. There are two situations + # where is_running may return False, either: + # - the job is done + # - the container hasn't started *yet* + if not self._check_is_job_done(): # If not done, check if pod has been assigned and is in Running # phase self._pod = self._fetch_pod() - return self._pod.get("status", {}).get("phase") == "Running" + pod_phase = self._pod.get("status", {}).get("phase") + if pod_phase == "Running": + return True + elif pod_phase in ("Succeeded", "Failed"): + # Now the pod is no longer running, but we may need to wait for + # the job object metadata to update. *Usually* that happens + # pretty much immediately, but since it is technically done by + # the job controller asynchronously, there's room for a race + # condition. It is more likely to happen when the control plane + # is overloaded, e.g. on minikube/kind. + try: + self._wait_job_done(timeout_seconds=40) + except TimeoutError: + # We shouldn't really get here unless the K8S control plane + # is really unhealthy. + raise KubernetesJobException("Pod is not running but the job is not done, last status: %s" % self._get_status()) + else: + return False return False - @property - def is_waiting(self): - return not self.is_done and not self.is_running - - @property - def reason(self): - if self.is_done: - if self.has_succeeded: + def _get_done_reason(self): + if self._check_is_job_done(): + if self._check_has_succeeded(): return 0, None # Best effort since Pod object can disappear on us at anytime else: @@ -595,3 +656,23 @@ def _done(): ) return None, None + + @property + def is_done(self): + return self._check_is_job_done() + + @property + def has_failed(self): + return self._check_has_failed() + + @property + def is_running(self): + return self._check_is_running() + + @property + def reason(self): + return self._get_done_reason() + + @property + def status(self): + return self._get_status() \ No newline at end of file