Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify on_error:destroy behavior for deployments #317

Merged
merged 6 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions servo/connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,16 @@ async def delete(self, options:kubernetes_asyncio.client.V1DeleteOptions = None)
body=options,
)

async def scale_to_zero(self) -> None:
"""his is used as a "soft" 'delete'/'destroy'.
Since the Deployment object is used as a wrapper around an existing k8s object that we did not create,
it shouldn't be destroyed. Instead, the deployments pods are destroyed by scaling it to 0 replicas.
"""

await self.refresh()
self.replicas = 0
await self.patch()

async def refresh(self) -> None:
"""Refresh the underlying Kubernetes Deployment resource."""
async with self.api_client() as api_client:
Expand Down Expand Up @@ -2820,8 +2830,8 @@ async def handle_error(self, error: Exception) -> bool:
elif self.on_failure == FailureMode.rollback:
await self.rollback(error)

elif self.on_failure == FailureMode.destroy:
await self.destroy(error)
elif self.on_failure == FailureMode.shutdown:
await self.shutdown(error)

else:
# Trap any new modes that need to be handled
Expand All @@ -2847,9 +2857,9 @@ async def rollback(self, error: Optional[Exception] = None) -> None:
...

@abc.abstractmethod
async def destroy(self, error: Optional[Exception] = None) -> None:
async def shutdown(self, error: Optional[Exception] = None) -> None:
"""
Asynchronously destroy the Optimization.
Asynchronously shut down the Optimization.

Args:
error: An optional exception that contextualizes the cause of the destruction.
Expand Down Expand Up @@ -2991,16 +3001,16 @@ async def rollback(self, error: Optional[Exception] = None) -> None:
timeout=self.timeout.total_seconds(),
)

async def destroy(self, error: Optional[Exception] = None) -> None:
async def shutdown(self, error: Optional[Exception] = None) -> None:
"""
Initiates the asynchronous deletion of the Deployment under optimization.
Initiates the asynchronous deletion of all pods in the Deployment under optimization.

Args:
error: An optional error that triggered the destruction.
"""
self.logger.info(f"adjustment failed: destroying deployment...")
self.logger.info(f"adjustment failed: shutting down deployment's pods...")
await asyncio.wait_for(
self.deployment.delete(),
self.deployment.scale_to_zero(),
timeout=self.timeout.total_seconds(),
)

Expand Down Expand Up @@ -3643,13 +3653,17 @@ async def destroy(self, error: Optional[Exception] = None) -> None:

self.logger.success(f'destroyed tuning Pod "{self.tuning_pod_name}"')

async def shutdown(self, error: Optional[Exception] = None) -> None:
# (not called - see handle_error(), defined for completeness)
await self.destroy(error)

async def handle_error(self, error: Exception) -> bool:
if self.on_failure == FailureMode.rollback or self.on_failure == FailureMode.destroy:
if self.on_failure == FailureMode.rollback or self.on_failure == FailureMode.shutdown:
# Ensure that we chain any underlying exceptions that may occur
try:
if self.on_failure == FailureMode.rollback:
self.logger.warning(
f"cannot rollback a tuning Pod: falling back to destroy: {error}"
f"cannot rollback a tuning Pod: falling back to shutdown: {error}"
)

await asyncio.wait_for(self.destroy(), timeout=self.timeout.total_seconds())
Expand Down Expand Up @@ -4022,7 +4036,7 @@ class FailureMode(str, enum.Enum):
"""

rollback = "rollback"
destroy = "destroy"
shutdown = "shutdown"
ignore = "ignore"
exception = "exception"

Expand Down
24 changes: 12 additions & 12 deletions tests/connectors/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ async def test_adjust_tuning_cpu_with_settlement(self, tuning_config, namespace,

async def test_adjust_handle_error_respects_nested_config(self, config: KubernetesConfiguration, kube: kubetest.client.TestClient):
config.timeout = "3s"
config.on_failure = FailureMode.destroy
config.on_failure = FailureMode.shutdown
config.cascade_common_settings(overwrite=True)
config.deployments[0].on_failure = FailureMode.exception
config.deployments[0].containers[0].memory.max = "256Gi"
Expand All @@ -1256,7 +1256,9 @@ async def test_adjust_handle_error_respects_nested_config(self, config: Kubernet
description = await connector.adjust([adjustment])
debug(description)

await Deployment.read("fiber-http", kube.namespace)
deployment = await Deployment.read("fiber-http", kube.namespace)
# check deployment was not scaled to 0 replicas (i.e., the outer-level 'shutdown' was overridden)
assert deployment.obj.spec.replicas != 0

# async def test_apply_no_changes(self):
# # resource_version stays the same and early exits
Expand Down Expand Up @@ -1473,7 +1475,7 @@ async def test_adjust_deployment_settlement_failed(
) -> None:
config.timeout = "15s"
config.settlement = "20s"
config.deployments[0].on_failure = FailureMode.destroy
config.deployments[0].on_failure = FailureMode.shutdown
connector = KubernetesConnector(config=config)

adjustment = Adjustment(
Expand All @@ -1494,11 +1496,9 @@ async def test_adjust_deployment_settlement_failed(
except AssertionError as e:
raise e from rejection_info.value

# Validate deployment destroyed
with pytest.raises(kubernetes.client.exceptions.ApiException) as not_found_error:
kubetest_deployment_becomes_unready.refresh()

assert not_found_error.value.status == 404 and not_found_error.value.reason == "Not Found", str(not_found_error.value)
# Validate deployment scaled down to 0 instances
kubetest_deployment_becomes_unready.refresh()
assert kubetest_deployment_becomes_unready.obj.spec.replicas == 0

async def test_adjust_tuning_never_ready(
self,
Expand All @@ -1507,7 +1507,7 @@ async def test_adjust_tuning_never_ready(
kube: kubetest.client.TestClient
) -> None:
tuning_config.timeout = "30s"
tuning_config.on_failure = FailureMode.destroy
tuning_config.on_failure = FailureMode.shutdown
tuning_config.cascade_common_settings(overwrite=True)
connector = KubernetesConnector(config=tuning_config)

Expand Down Expand Up @@ -1540,7 +1540,7 @@ async def test_adjust_tuning_oom_killed(
kube: kubetest.client.TestClient
) -> None:
tuning_config.timeout = "25s"
tuning_config.on_failure = FailureMode.destroy
tuning_config.on_failure = FailureMode.shutdown
tuning_config.cascade_common_settings(overwrite=True)
connector = KubernetesConnector(config=tuning_config)

Expand Down Expand Up @@ -1574,8 +1574,8 @@ async def test_adjust_tuning_settlement_failed(
) -> None:
tuning_config.timeout = "25s"
tuning_config.settlement = "15s"
tuning_config.on_failure = FailureMode.destroy
tuning_config.deployments[0].on_failure = FailureMode.destroy
tuning_config.on_failure = FailureMode.shutdown
tuning_config.deployments[0].on_failure = FailureMode.shutdown
connector = KubernetesConnector(config=tuning_config)


Expand Down