Skip to content

Commit

Permalink
[Serve] Fix serve non atomic shutdown (#36927)
Browse files Browse the repository at this point in the history
Currently we are relying on the client to wait for all the resources before shutting off the controller. This caused the issue for when they interrupt the process and can cause incomplete shutdown. In this PR we moved the shutdown logic into the event loop which would be triggered by a `_shutting_down` flag on the controller. So even if the client interrupted the process, the controller will continue to shutdown all the resources and then kill itself.
  • Loading branch information
GeneDer authored Jul 7, 2023
1 parent 95a51ce commit 267b14e
Show file tree
Hide file tree
Showing 13 changed files with 457 additions and 74 deletions.
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,11 @@ py_test(
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

py_test(
name = "test_endpoint_state",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
25 changes: 23 additions & 2 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,20 @@ def _delete_deployment(self, name):

def delete(self):
"""Delete the application"""
logger.info(f"Deleting application '{self._name}'")
logger.info(
f"Deleting application '{self._name}'",
extra={"log_to_stderr": False},
)
self._set_target_state(deleting=True)

def is_deleted(self) -> bool:
"""Check whether the application is already deleted.
For an application to be considered deleted, the target state has to be set to
deleting and all deployments have to be deleted.
"""
return self._target_state.deleting and len(self._get_live_deployments()) == 0

def apply_deployment_info(
self, deployment_name: str, deployment_info: DeploymentInfo
) -> None:
Expand Down Expand Up @@ -412,7 +423,7 @@ def update(self) -> bool:

# Check if app is ready to be deleted
if self._target_state.deleting:
return len(self._get_live_deployments()) == 0
return self.is_deleted()
return False

def get_checkpoint_data(self) -> ApplicationTargetState:
Expand Down Expand Up @@ -639,6 +650,16 @@ def shutdown(self) -> None:
for app_state in self._application_states.values():
app_state.delete()

def is_ready_for_shutdown(self) -> bool:
"""Return whether all applications have shut down.
Iterate through all application states and check if all their applications
are deleted.
"""
return all(
app_state.is_deleted() for app_state in self._application_states.values()
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, ApplicationTargetState]]
) -> None:
Expand Down
59 changes: 11 additions & 48 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CLIENT_POLLING_INTERVAL_S,
CLIENT_CHECK_CREATION_POLLING_INTERVAL_S,
MAX_CACHED_HANDLES,
SERVE_NAMESPACE,
SERVE_DEFAULT_APP_NAME,
)
from ray.serve._private.deploy_utils import get_deploy_args
Expand Down Expand Up @@ -95,7 +94,7 @@ def __del__(self):
def __reduce__(self):
raise RayServeException(("Ray Serve client cannot be serialized."))

def shutdown(self) -> None:
def shutdown(self, timeout_s: float = 30.0) -> None:
"""Completely shut down the connected Serve instance.
Shuts down all processes and deletes all state associated with the
Expand All @@ -107,53 +106,17 @@ def shutdown(self) -> None:
del self.handle_cache[k]

if ray.is_initialized() and not self._shutdown:
ray.get(self._controller.shutdown.remote())
self._wait_for_deployments_shutdown()

ray.kill(self._controller, no_restart=True)

# Wait for the named actor entry gets removed as well.
started = time.time()
while True:
try:
ray.get_actor(self._controller_name, namespace=SERVE_NAMESPACE)
if time.time() - started > 5:
logger.warning(
"Waited 5s for Serve to shutdown gracefully but "
"the controller is still not cleaned up. "
"You can ignore this warning if you are shutting "
"down the Ray cluster."
)
break
except ValueError: # actor name is removed
break

self._shutdown = True

def _wait_for_deployments_shutdown(self, timeout_s: int = 60):
"""Waits for all deployments to be shut down and deleted.
Raises TimeoutError if this doesn't happen before timeout_s.
"""
start = time.time()
while time.time() - start < timeout_s:
deployment_statuses = self.get_all_deployment_statuses()
if len(deployment_statuses) == 0:
break
else:
logger.debug(
f"Waiting for shutdown, {len(deployment_statuses)} "
"deployments still alive."
try:
ray.get(self._controller.graceful_shutdown.remote(), timeout=timeout_s)
except ray.exceptions.RayActorError:
# Controller has been shut down.
pass
except TimeoutError:
logger.warning(
f"Controller failed to shut down within {timeout_s}s. "
"Check controller logs for more details."
)
time.sleep(CLIENT_POLLING_INTERVAL_S)
else:
live_names = [
deployment_status.name for deployment_status in deployment_statuses
]
raise TimeoutError(
f"Shutdown didn't complete after {timeout_s}s. "
f"Deployments still alive: {live_names}."
)
self._shutdown = True

def _wait_for_deployment_healthy(self, name: str, timeout_s: int = -1):
"""Waits for the named deployment to enter "HEALTHY" status.
Expand Down
15 changes: 14 additions & 1 deletion python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,10 @@ def _set_target_state_deleting(self) -> None:
self._curr_status_info = DeploymentStatusInfo(
self._name, DeploymentStatus.UPDATING
)
logger.info(f"Deleting deployment {self._name}.")
logger.info(
f"Deleting deployment {self._name}.",
extra={"log_to_stderr": False},
)

def _set_target_state(self, target_info: DeploymentInfo) -> None:
"""Set the target state for the deployment to the provided info."""
Expand Down Expand Up @@ -2284,6 +2287,16 @@ def shutdown(self):
# TODO(jiaodong): Need to add some logic to prevent new replicas
# from being created once shutdown signal is sent.

def is_ready_for_shutdown(self) -> bool:
"""Return whether all deployments are shutdown.
Check there are no deployment states and no checkpoints.
"""
return (
len(self._deployment_states) == 0
and self._kv_store.get(CHECKPOINT_KEY) is None
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, Tuple]]
) -> None:
Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/endpoint_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def __init__(self, kv_store: KVStoreBase, long_poll_host: LongPollHost):
def shutdown(self):
self._kv_store.delete(CHECKPOINT_KEY)

def is_ready_for_shutdown(self) -> bool:
"""Returns whether the endpoint checkpoint has been deleted.
Get the endpoint checkpoint from the kv store. If it is None, then it has been
deleted.
"""
return self._kv_store.get(CHECKPOINT_KEY) is None

def _checkpoint(self):
self._kv_store.put(CHECKPOINT_KEY, cloudpickle.dumps(self._endpoints))

Expand Down
32 changes: 32 additions & 0 deletions python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,27 @@ def shutdown(self):
self._shutting_down = True
ray.kill(self.actor_handle, no_restart=True)

def is_ready_for_shutdown(self) -> bool:
"""Return whether the HTTP proxy actor is shutdown.
For an HTTP proxy actor to be considered shutdown, it must be marked as
_shutting_down and the actor must be dead. If the actor is dead, the health
check will return RayActorError.
"""
if not self._shutting_down:
return False

try:
ray.get(self._actor_handle.check_health.remote(), timeout=0.001)
except ray.exceptions.RayActorError:
# The actor is dead, so it's ready for shutdown.
return True
except ray.exceptions.GetTimeoutError:
# The actor is still alive, so it's not ready for shutdown.
return False

return False


class HTTPState:
"""Manages all state for HTTP proxies in the system.
Expand Down Expand Up @@ -269,6 +290,17 @@ def shutdown(self) -> None:
for proxy_state in self._proxy_states.values():
proxy_state.shutdown()

def is_ready_for_shutdown(self) -> bool:
"""Return whether all proxies are shutdown.
Iterate through all proxy states and check if all their proxy actors
are shutdown.
"""
return all(
proxy_state.is_ready_for_shutdown()
for proxy_state in self._proxy_states.values()
)

def get_config(self):
return self._config

Expand Down
98 changes: 95 additions & 3 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ async def __init__(
worker_id=ray.get_runtime_context().get_worker_id(),
log_file_path=get_component_logger_file_path(),
)
self._shutting_down = False
self._shutdown = asyncio.Event()
self._shutdown_start_time = None

run_background_task(self.run_control_loop())

Expand Down Expand Up @@ -307,6 +310,12 @@ async def run_control_loop(self) -> None:
recovering_timeout = RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S
start_time = time.time()
while True:
if self._shutting_down:
try:
self.shutdown()
except Exception:
logger.exception("Exception during shutdown.")

if (
not self.done_recovering_event.is_set()
and time.time() - start_time > recovering_timeout
Expand Down Expand Up @@ -439,15 +448,84 @@ def get_root_url(self):
)
return http_config.root_url

def config_checkpoint_deleted(self) -> bool:
"""Returns whether the config checkpoint has been deleted.
Get the config checkpoint from the kv store. If it is None, then it has been
deleted.
"""
return self.kv_store.get(CONFIG_CHECKPOINT_KEY) is None

def shutdown(self):
"""Shuts down the serve instance completely."""
"""Shuts down the serve instance completely.
This method will only be triggered when `self._shutting_down` is true. It
deletes the kv store for config checkpoints, sets application state to deleting,
delete all deployments, and shuts down all HTTP proxies. Once all these
resources are released, it then kills the controller actor.
"""
if not self._shutting_down:
return

if self._shutdown_start_time is None:
self._shutdown_start_time = time.time()

logger.info("Controller shutdown started!", extra={"log_to_stderr": False})
self.kv_store.delete(CONFIG_CHECKPOINT_KEY)
self.application_state_manager.shutdown()
self.deployment_state_manager.shutdown()
self.endpoint_state.shutdown()
if self.http_state:
self.http_state.shutdown()

config_checkpoint_deleted = self.config_checkpoint_deleted()
application_is_shutdown = self.application_state_manager.is_ready_for_shutdown()
deployment_is_shutdown = self.deployment_state_manager.is_ready_for_shutdown()
endpoint_is_shutdown = self.endpoint_state.is_ready_for_shutdown()
http_state_is_shutdown = (
self.http_state is None or self.http_state.is_ready_for_shutdown()
)
if (
config_checkpoint_deleted
and application_is_shutdown
and deployment_is_shutdown
and endpoint_is_shutdown
and http_state_is_shutdown
):
logger.warning(
"All resources have shut down, shutting down controller!",
extra={"log_to_stderr": False},
)
_controller_actor = ray.get_runtime_context().current_actor
self._shutdown.set()
ray.kill(_controller_actor, no_restart=True)
elif time.time() - self._shutdown_start_time > 10:
if not config_checkpoint_deleted:
logger.warning(
f"{CONFIG_CHECKPOINT_KEY} not yet deleted",
extra={"log_to_stderr": False},
)
if not application_is_shutdown:
logger.warning(
"application not yet shutdown",
extra={"log_to_stderr": False},
)
if not deployment_is_shutdown:
logger.warning(
"deployment not yet shutdown",
extra={"log_to_stderr": False},
)
if not endpoint_is_shutdown:
logger.warning(
"endpoint not yet shutdown",
extra={"log_to_stderr": False},
)
if not http_state_is_shutdown:
logger.warning(
"http_state not yet shutdown",
extra={"log_to_stderr": False},
)

def deploy(
self,
name: str,
Expand Down Expand Up @@ -835,6 +913,20 @@ def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
"""
self.deployment_state_manager.record_multiplexed_replica_info(info)

async def graceful_shutdown(self, wait: bool = True):
"""Set the shutting down flag on controller to signal shutdown in
run_control_loop().
This is used to signal to the controller that it should proceed with shutdown
process, so it can shut down gracefully. It also waits until the shutdown
event is triggered if wait is true.
"""
self._shutting_down = True
if not wait:
return

await self._shutdown.wait()


@ray.remote(num_cpus=0, max_calls=1)
def deploy_serve_application(
Expand Down Expand Up @@ -943,7 +1035,7 @@ def __init__(
http_proxy_port: int = 8000,
):
try:
self._controller = ray.get_actor(controller_name, namespace="serve")
self._controller = ray.get_actor(controller_name, namespace=SERVE_NAMESPACE)
except ValueError:
self._controller = None
if self._controller is None:
Expand All @@ -956,7 +1048,7 @@ def __init__(
max_restarts=-1,
max_task_retries=-1,
resources={HEAD_NODE_RESOURCE_NAME: 0.001},
namespace="serve",
namespace=SERVE_NAMESPACE,
max_concurrency=CONTROLLER_MAX_CONCURRENCY,
).remote(
controller_name,
Expand Down
Loading

0 comments on commit 267b14e

Please sign in to comment.