Skip to content

Commit

Permalink
Add external-load-balancer relation (#234)
Browse files Browse the repository at this point in the history
* Add external-load-balancer relation
* Ensure SANs from apiserver cert and refresh certs if needed
---------
Co-authored-by: Adam Dyess <adam.dyess@canonical.com>
  • Loading branch information
HomayoonAlimohammadi authored Jan 29, 2025
1 parent 2ec3c65 commit d019173
Show file tree
Hide file tree
Showing 13 changed files with 521 additions and 16 deletions.
2 changes: 2 additions & 0 deletions charms/worker/k8s/charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,5 @@ requires:
interface: external_cloud_provider
gcp:
interface: gcp-integration
external-load-balancer:
interface: loadbalancer
96 changes: 95 additions & 1 deletion charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class utilises different connection factories (UnixSocketConnectionFactory
import logging
import socket
from contextlib import contextmanager
from datetime import datetime
from http.client import HTTPConnection, HTTPException
from typing import Any, Dict, Generator, List, Optional, Type, TypeVar

Expand All @@ -45,7 +46,7 @@ class utilises different connection factories (UnixSocketConnectionFactory

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 5
LIBPATCH = 6

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -598,6 +599,68 @@ class GetKubeConfigResponse(BaseRequestModel):
metadata: KubeConfigMetadata


class RefreshCertificatesPlanMetadata(BaseModel, allow_population_by_field_name=True):
"""Metadata for the certificates plan response.
Attributes:
seed (int): The seed for the new certificates.
certificate_signing_requests (Optional[list[str]]): List of names
of the CertificateSigningRequests that need to be signed externally (for worker nodes).
"""

# NOTE(Hue): Alias is because of a naming mismatch:
# https://github.com/canonical/k8s-snap-api/blob/6d4139295b37800fb2b3fcce9fc260e6caf284b9/api/v1/rpc_refresh_certificates_plan.go#L12
seed: Optional[int] = Field(default=None, alias="seconds")
certificate_signing_requests: Optional[list[str]] = Field(
default=None, alias="certificate-signing-requests"
)


class RefreshCertificatesPlanResponse(BaseRequestModel):
"""Response model for the refresh certificates plan.
Attributes:
metadata (RefreshCertificatesPlanMetadata): Metadata for the certificates plan response.
"""

metadata: RefreshCertificatesPlanMetadata


class RefreshCertificatesRunRequest(BaseModel, allow_population_by_field_name=True):
"""Request model for running the refresh certificates run.
Attributes:
seed (int): The seed for the new certificates from plan response.
expiration_seconds (int): The duration of the new certificates.
extra_sans (list[str]): List of extra sans for the new certificates.
"""

seed: int
expiration_seconds: int = Field(alias="expiration-seconds")
extra_sans: Optional[list[str]] = Field(alias="extra-sans")


class RefreshCertificatesRunMetadata(BaseModel, allow_population_by_field_name=True):
"""Metadata for RefreshCertificatesRunResponse.
Attributes:
expiration_seconds (int): The duration of the new certificates
(might not match the requested value).
"""

expiration_seconds: int = Field(alias="expiration-seconds")


class RefreshCertificatesRunResponse(BaseRequestModel):
"""Response model for the refresh certificates run.
Attributes:
metadata (RefreshCertificatesRunMetadata): Metadata for the certificates run response.
"""

metadata: RefreshCertificatesRunMetadata


T = TypeVar("T", bound=BaseRequestModel)


Expand Down Expand Up @@ -920,3 +983,34 @@ def get_kubeconfig(self, server: Optional[str]) -> str:
body = {"server": server or ""}
response = self._send_request(endpoint, "GET", GetKubeConfigResponse, body)
return response.metadata.kubeconfig

def refresh_certs(
self, extra_sans: list[str], expiration_seconds: Optional[int] = None
) -> None:
"""Refresh the certificates for the cluster.
Args:
extra_sans (list[str]): List of extra SANs for the certificates.
expiration_seconds (Optional[int]): The duration of the new certificates.
"""
plan_endpoint = "/1.0/k8sd/refresh-certs/plan"
plan_resp = self._send_request(plan_endpoint, "POST", RefreshCertificatesPlanResponse, {})

# NOTE(Hue): Default certificate expiration is set to 20 years:
# https://github.com/canonical/k8s-snap/blob/32e35128394c0880bcc4ce87447f4247cc315ba5/src/k8s/pkg/k8sd/app/hooks_bootstrap.go#L331-L338
if expiration_seconds is None:
now = datetime.now()
twenty_years_later = datetime(
now.year + 20, now.month, now.day, now.hour, now.minute, now.second
)
expiration_seconds = int((twenty_years_later - now).total_seconds())

run_endpoint = "/1.0/k8sd/refresh-certs/run"
run_req = RefreshCertificatesRunRequest( # type: ignore
seed=plan_resp.metadata.seed,
expiration_seconds=expiration_seconds,
extra_sans=extra_sans,
)

body = run_req.dict(exclude_none=True, by_alias=True)
self._send_request(run_endpoint, "POST", RefreshCertificatesRunResponse, body)
1 change: 1 addition & 0 deletions charms/worker/k8s/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ websocket-client==1.8.0
poetry-core==1.9.1
lightkube==0.17.1
httpx==0.27.2
loadbalancer_interface == 1.2.0
143 changes: 137 additions & 6 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@
from charms.reconciler import Reconciler
from cloud_integration import CloudIntegration
from cos_integration import COSIntegration
from endpoints import build_url
from events import update_status
from inspector import ClusterInspector
from kube_control import configure as configure_kube_control
from literals import (
APISERVER_CERT,
APISERVER_PORT,
CLUSTER_RELATION,
CLUSTER_WORKER_RELATION,
CONTAINERD_BASE_PATH,
Expand All @@ -80,13 +83,19 @@
DEPENDENCIES,
ETC_KUBERNETES,
ETCD_RELATION,
EXTERNAL_LOAD_BALANCER_PORT,
EXTERNAL_LOAD_BALANCER_RELATION,
EXTERNAL_LOAD_BALANCER_REQUEST_NAME,
EXTERNAL_LOAD_BALANCER_RESPONSE_NAME,
K8SD_PORT,
K8SD_SNAP_SOCKET,
KUBECONFIG,
KUBECTL_PATH,
SUPPORTED_DATASTORES,
)
from loadbalancer_interface import LBProvider
from ops.interface_kube_control import KubeControlProvides
from pki import get_certificate_sans
from pydantic import SecretStr
from snap import management as snap_management
from snap import version as snap_version
Expand All @@ -98,7 +107,7 @@
log = logging.getLogger(__name__)


def _get_public_address() -> str:
def _get_juju_public_address() -> str:
"""Get public address from juju.
Returns:
Expand Down Expand Up @@ -178,6 +187,7 @@ def __init__(self, *args):
)
self._upgrade_snap = False
self._stored.set_default(is_dying=False, cluster_name=str(), upgrade_granted=False)
self._external_load_balancer_address = ""

self.cos_agent = COSAgentProvider(
self,
Expand All @@ -196,6 +206,7 @@ def __init__(self, *args):
self.etcd = EtcdReactiveRequires(self)
self.kube_control = KubeControlProvides(self, endpoint="kube-control")
self.framework.observe(self.on.get_kubeconfig_action, self._get_external_kubeconfig)
self.external_load_balancer = LBProvider(self, EXTERNAL_LOAD_BALANCER_RELATION)

def _k8s_info(self, event: ops.EventBase):
"""Send cluster information on the kubernetes-info relation.
Expand Down Expand Up @@ -377,10 +388,24 @@ def _check_k8sd_ready(self):

def _get_extra_sans(self):
"""Retrieve the certificate extra SANs."""
# Get the extra SANs from the configuration
extra_sans_str = str(self.config.get("kube-apiserver-extra-sans") or "")
configured_sans = {san for san in extra_sans_str.strip().split() if san}
all_sans = configured_sans | set([_get_public_address()])
return sorted(all_sans)
extra_sans = set(extra_sans_str.strip().split())

# Add the ingress addresses of all units
extra_sans.add(_get_juju_public_address())
binding = self.model.get_binding(CLUSTER_RELATION)
addresses = binding and binding.network.ingress_addresses
if addresses:
log.info("Adding ingress addresses to extra SANs")
extra_sans |= {str(addr) for addr in addresses}

# Add the external load balancer address
if self._external_load_balancer_address:
log.info("Adding external load balancer address to extra SANs")
extra_sans.add(self._external_load_balancer_address)

return sorted(extra_sans)

def _assemble_bootstrap_config(self):
"""Assemble the bootstrap configuration for the Kubernetes cluster.
Expand All @@ -400,6 +425,45 @@ def _assemble_bootstrap_config(self):
config.extra_args.craft(self.config, bootstrap_config, cluster_name)
return bootstrap_config

def _configure_external_load_balancer(self) -> None:
"""Configure the external load balancer for the application.
This method checks if the external load balancer is available and then
proceeds to configure it by sending a request with the necessary parameters.
It waits for a response from the external load balancer and handles any errors that
may occur during the process.
"""
if not self.is_control_plane:
log.info("External load balancer is only configured for control-plane units.")
return

if not self.external_load_balancer.is_available:
log.info("External load balancer relation is not available. Skipping setup.")
return

status.add(ops.MaintenanceStatus("Configuring external loadBalancer"))

req = self.external_load_balancer.get_request(EXTERNAL_LOAD_BALANCER_REQUEST_NAME)
req.protocol = req.protocols.tcp
req.port_mapping = {EXTERNAL_LOAD_BALANCER_PORT: APISERVER_PORT}
req.public = True
if not req.health_checks:
req.add_health_check(protocol=req.protocols.https, port=APISERVER_PORT, path="/livez")
self.external_load_balancer.send_request(req)
log.info("External load balancer request was sent")

resp = self.external_load_balancer.get_response(EXTERNAL_LOAD_BALANCER_RESPONSE_NAME)
if not resp:
msg = "No response from external load balancer"
status.add(ops.WaitingStatus(msg))
raise ReconcilerError(msg)
if resp.error:
msg = f"External load balancer error: {resp.error}"
status.add(ops.BlockedStatus(msg))
raise ReconcilerError(msg)

self._external_load_balancer_address = resp.address

@on_error(
ops.WaitingStatus("Waiting to bootstrap k8s snap"),
ReconcilerError,
Expand Down Expand Up @@ -909,6 +973,7 @@ def _reconcile(self, event: ops.EventBase):
self._update_kubernetes_version()
if self.lead_control_plane:
self._k8s_info(event)
self._configure_external_load_balancer()
self._bootstrap_k8s_snap()
self._ensure_cluster_config()
self._create_cluster_tokens()
Expand All @@ -924,6 +989,7 @@ def _reconcile(self, event: ops.EventBase):
if self.is_control_plane:
self._copy_internal_kubeconfig()
self._expose_ports()
self._ensure_cert_sans()

def _evaluate_removal(self, event: ops.EventBase) -> bool:
"""Determine if my unit is being removed.
Expand Down Expand Up @@ -1081,14 +1147,79 @@ def _get_external_kubeconfig(self, event: ops.ActionEvent):
try:
server = event.params.get("server")
if not server:
log.info("No server requested, use public-address")
server = f"{_get_public_address()}:6443"
log.info("No server requested, use public address")

server = self._get_public_address()
if not server:
event.fail("Failed to get public address. Check logs for details.")
return

port = (
str(EXTERNAL_LOAD_BALANCER_PORT)
if self.external_load_balancer.is_available
else str(APISERVER_PORT)
)

server = build_url(server, port, "https")
log.info("Formatted server address: %s", server)
log.info("Requesting kubeconfig for server=%s", server)
resp = self.api_manager.get_kubeconfig(server)
event.set_results({"kubeconfig": resp})
except (InvalidResponseError, K8sdConnectionError) as e:
event.fail(f"Failed to retrieve kubeconfig: {e}")

def _get_public_address(self) -> str:
"""Get the most public address either from external load balancer or from juju.
If the external load balancer is available and the unit is a control-plane unit,
the external load balancer address will be used. Otherwise, the juju public address
will be used.
NOTE: Don't ignore the unit's IP in the extra SANs just because there's a load balancer.
Returns:
str: The public ip address of the unit.
"""
if self._external_load_balancer_address:
log.info("Using external load balancer address as the public address")
return self._external_load_balancer_address

log.info("Using juju public address as the public address")
return _get_juju_public_address()

@on_error(
ops.WaitingStatus("Ensuring SANs are up-to-date"),
InvalidResponseError,
K8sdConnectionError,
)
def _ensure_cert_sans(self):
"""Ensure the certificate SANs are up-to-date.
This method checks if the certificate SANs match the required extra SANs.
If they are not, the certificates are refreshed with the new SANs.
"""
if not self.is_control_plane:
return

extra_sans = self._get_extra_sans()
if not extra_sans:
log.info("No extra SANs to update")
return

dns_sans, ip_sans = get_certificate_sans(APISERVER_CERT)
ip_sans = [str(ip) for ip in ip_sans]
all_cert_sans = dns_sans + ip_sans

missing_sans = [san for san in extra_sans if san not in all_cert_sans]
if missing_sans:
log.info(
"%s not in cert SANs. Refreshing certs with new SANs: %s", missing_sans, extra_sans
)
status.add(ops.MaintenanceStatus("Refreshing Certificates"))
self.api_manager.refresh_certs(extra_sans)
log.info("Certificates have been refreshed")

log.info("Certificate SANs are up-to-date")


if __name__ == "__main__": # pragma: nocover
ops.main(K8sCharm)
Loading

0 comments on commit d019173

Please sign in to comment.