From 914f27d1eeaae0e14bce3b6adbc7cbd53c2422e8 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 17:28:18 -0400 Subject: [PATCH 1/6] Improve typed socket connection robustness --- plane/src/drone/mod.rs | 140 ++++++++++++++------------ plane/src/typed_socket/mod.rs | 9 +- plane/src/typed_unix_socket/client.rs | 1 - 3 files changed, 82 insertions(+), 68 deletions(-) diff --git a/plane/src/drone/mod.rs b/plane/src/drone/mod.rs index 1c15c0986..639e44ba1 100644 --- a/plane/src/drone/mod.rs +++ b/plane/src/drone/mod.rs @@ -16,7 +16,7 @@ use crate::{ names::DroneName, protocol::{BackendAction, MessageFromDrone, MessageToDrone, RenewKeyResponse}, signals::wait_for_shutdown_signal, - typed_socket::client::TypedSocketConnector, + typed_socket::{client::TypedSocketConnector, TypedSocketSender}, types::{BackendState, ClusterName, DronePoolName}, }; use anyhow::{anyhow, Result}; @@ -98,74 +98,88 @@ pub async fn drone_loop( break; }; - match message { - MessageToDrone::Action(BackendActionMessage { - action_id, - backend_id, - action, - .. - }) => { - tracing::info!( - backend_id = backend_id.as_value(), - action = action.as_value(), - "Received action." - ); - - if let BackendAction::Spawn { key, .. } = &action { - if key.deadlines.soft_terminate_at.0 < Utc::now() { - tracing::warn!( - backend_id = backend_id.as_value(), - "Received spawn request with deadline in the past. Ignoring." - ); - } - - // Register the key with the key manager, ensuring that it will be refreshed. - let result = key_manager - .lock() - .expect("Key manager lock poisoned.") - .register_key(backend_id.clone(), key.clone()); - - if !result { - tracing::warn!( - backend = backend_id.as_value(), - "Key already registered for backend. Ignoring spawn request." - ); - continue; - } - } + let key_manager = key_manager.clone(); + tokio::spawn(handle_message( + message, + key_manager, + socket.sender(|x| x), + executor.clone(), + )); + } + } + + pub async fn handle_message( + message: MessageToDrone, + key_manager: Arc>, + sender: TypedSocketSender, + executor: Arc, + ) { + match message { + MessageToDrone::Action(BackendActionMessage { + action_id, + backend_id, + action, + .. + }) => { + tracing::info!( + backend_id = backend_id.as_value(), + action = action.as_value(), + "Received action." + ); - if let Err(err) = executor.apply_action(&backend_id, &action).await { - tracing::error!(?err, "Error applying action."); - continue; + if let BackendAction::Spawn { key, .. } = &action { + if key.deadlines.soft_terminate_at.0 < Utc::now() { + tracing::warn!( + backend_id = backend_id.as_value(), + "Received spawn request with deadline in the past. Ignoring." + ); } - if let Err(err) = socket.send(MessageFromDrone::AckAction { action_id }).await { - tracing::error!(?err, "Error sending ack."); - continue; + // Register the key with the key manager, ensuring that it will be refreshed. + let result = key_manager + .lock() + .expect("Key manager lock poisoned.") + .register_key(backend_id.clone(), key.clone()); + + if !result { + tracing::warn!( + backend = backend_id.as_value(), + "Key already registered for backend. Ignoring spawn request." + ); + return; } } - MessageToDrone::AckEvent { event_id } => { - if let Err(err) = executor.ack_event(event_id) { - tracing::error!(?err, "Error acking event."); - } + + if let Err(err) = executor.apply_action(&backend_id, &action).await { + tracing::error!(?err, "Error applying action."); + return; } - MessageToDrone::RenewKeyResponse(renew_key_response) => { - let RenewKeyResponse { backend, deadlines } = renew_key_response; - tracing::info!( - backend_id = backend.as_value(), - deadlines = deadlines.as_value(), - "Received key renewal response." - ); - - if let Some(deadlines) = deadlines { - key_manager - .lock() - .expect("Key manager lock poisoned.") - .update_deadlines(&backend, deadlines); - } else { - // TODO: we could begin the graceful termiation here. - tracing::warn!("Key renewal failed."); - } + + if let Err(err) = sender.send(MessageFromDrone::AckAction { action_id }) { + tracing::error!(?err, "Error acking action."); + } + } + MessageToDrone::AckEvent { event_id } => { + if let Err(err) = executor.ack_event(event_id) { + tracing::error!(?err, "Error acking event."); + } + } + MessageToDrone::RenewKeyResponse(renew_key_response) => { + let RenewKeyResponse { backend, deadlines } = renew_key_response; + tracing::info!( + backend_id = backend.as_value(), + deadlines = deadlines.as_value(), + "Received key renewal response." + ); + + if let Some(deadlines) = deadlines { + key_manager + .lock() + .expect("Key manager lock poisoned.") + .update_deadlines(&backend, deadlines); + } else { + // TODO: we could begin the graceful termiation here. + tracing::warn!("Key renewal failed."); } } } diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index 4c20425fa..8b23beedb 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -68,10 +68,11 @@ impl TypedSocketSender { impl TypedSocket { pub async fn send(&mut self, message: T) -> Result<(), PlaneClientError> { - self.send - .send(SocketAction::Send(message)) - .await - .map_err(|_| PlaneClientError::SendFailed)?; + if let Err(e) = self.send.send(SocketAction::Send(message)).await { + tracing::error!(?e, "Failed to send message on websocket. Closing receiver."); + self.recv.close(); + return Err(PlaneClientError::SendFailed); + } Ok(()) } diff --git a/plane/src/typed_unix_socket/client.rs b/plane/src/typed_unix_socket/client.rs index e2eeef0f0..30436beac 100644 --- a/plane/src/typed_unix_socket/client.rs +++ b/plane/src/typed_unix_socket/client.rs @@ -145,7 +145,6 @@ where // Task to handle receiving messages let recv_future = { - let event_tx = event_tx.clone(); let response_map = Arc::clone(&response_map); async move { loop { From e74ad9612cbdd21a9e4b24f9d34b448b1e582d86 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 18:06:47 -0400 Subject: [PATCH 2/6] old send behavior --- plane/plane-tests/tests/backend_actions.rs | 2 -- plane/plane-tests/tests/backend_lifecycle.rs | 2 -- plane/plane-tests/tests/dns_api.rs | 3 -- plane/plane-tests/tests/proxy_cert_lease.rs | 5 --- plane/src/admin.rs | 6 ++-- plane/src/controller/dns.rs | 2 +- plane/src/controller/drone.rs | 16 ++++----- plane/src/controller/proxy.rs | 35 +++++--------------- plane/src/dns/mod.rs | 2 +- plane/src/typed_socket/mod.rs | 10 +++--- 10 files changed, 23 insertions(+), 60 deletions(-) diff --git a/plane/plane-tests/tests/backend_actions.rs b/plane/plane-tests/tests/backend_actions.rs index 623666a37..952ad0837 100644 --- a/plane/plane-tests/tests/backend_actions.rs +++ b/plane/plane-tests/tests/backend_actions.rs @@ -72,7 +72,6 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { .send(MessageFromDrone::Heartbeat(Heartbeat { local_time: LoggableTime(Utc::now()), })) - .await .unwrap(); // Wait for the drone to be registered. @@ -132,7 +131,6 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) { drone_connection .send(MessageFromDrone::AckAction { action_id }) - .await .unwrap(); // Drone connections should always be closed to prevent a warning, but this diff --git a/plane/plane-tests/tests/backend_lifecycle.rs b/plane/plane-tests/tests/backend_lifecycle.rs index b7783bd00..46274efb0 100644 --- a/plane/plane-tests/tests/backend_lifecycle.rs +++ b/plane/plane-tests/tests/backend_lifecycle.rs @@ -146,7 +146,6 @@ async fn backend_lifecycle(env: TestEnvironment) { .send(MessageFromProxy::RouteInfoRequest(RouteInfoRequest { token: response.token.clone(), })) - .await .unwrap(); let result = proxy.recv().with_timeout(10).await.unwrap().unwrap(); @@ -179,7 +178,6 @@ async fn backend_lifecycle(env: TestEnvironment) { tracing::info!("Sending keepalive."); proxy .send(MessageFromProxy::KeepAlive(response.backend_id.clone())) - .await .unwrap(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; diff --git a/plane/plane-tests/tests/dns_api.rs b/plane/plane-tests/tests/dns_api.rs index 67b43d9cc..c886b0522 100644 --- a/plane/plane-tests/tests/dns_api.rs +++ b/plane/plane-tests/tests/dns_api.rs @@ -31,7 +31,6 @@ async fn dns_api(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse { @@ -47,7 +46,6 @@ async fn dns_api(env: TestEnvironment) { txt_value: "foobaz".to_string(), }, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::SetTxtRecordResponse { @@ -61,7 +59,6 @@ async fn dns_api(env: TestEnvironment) { .send(MessageFromDns::TxtRecordRequest { cluster: env.cluster.clone(), }) - .await .unwrap(); let MessageToDns::TxtRecordResponse { cluster, txt_value } = dns_client.recv().await.unwrap(); diff --git a/plane/plane-tests/tests/proxy_cert_lease.rs b/plane/plane-tests/tests/proxy_cert_lease.rs index dfd7120b4..233cdbe06 100644 --- a/plane/plane-tests/tests/proxy_cert_lease.rs +++ b/plane/plane-tests/tests/proxy_cert_lease.rs @@ -21,7 +21,6 @@ async fn request_dns_lease(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse { @@ -47,7 +46,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse { @@ -68,7 +66,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse { @@ -82,7 +79,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::ReleaseCertLease, )) - .await .unwrap(); // Avoid race. @@ -92,7 +88,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) { .send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, )) - .await .unwrap(); let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse { diff --git a/plane/src/admin.rs b/plane/src/admin.rs index 1f34d6a4a..582c6b0e8 100644 --- a/plane/src/admin.rs +++ b/plane/src/admin.rs @@ -322,8 +322,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE conn.send(MessageFromProxy::CertManagerRequest( CertManagerRequest::CertLeaseRequest, - )) - .await?; + ))?; let response = conn.recv().await.expect("Failed to receive response"); @@ -346,8 +345,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE CertManagerRequest::SetTxtRecord { txt_value: message.clone(), }, - )) - .await?; + ))?; let response = conn.recv().await.expect("Failed to receive response"); diff --git a/plane/src/controller/dns.rs b/plane/src/controller/dns.rs index c3bd2ffe7..44ee931df 100644 --- a/plane/src/controller/dns.rs +++ b/plane/src/controller/dns.rs @@ -39,7 +39,7 @@ pub async fn dns_socket_inner( let message = MessageToDns::TxtRecordResponse { cluster, txt_value }; tracing::info!(?message, "Sending txt record response to drone."); - if let Err(err) = socket.send(message).await { + if let Err(err) = socket.send(message) { tracing::error!(?err, "Error sending txt record response to drone."); } } diff --git a/plane/src/controller/drone.rs b/plane/src/controller/drone.rs index 30907a585..2788b4bd3 100644 --- a/plane/src/controller/drone.rs +++ b/plane/src/controller/drone.rs @@ -61,11 +61,9 @@ pub async fn handle_message_from_drone( .update_state(&backend_event.backend_id, backend_event.state) .await?; - sender - .send(MessageToDrone::AckEvent { - event_id: backend_event.event_id, - }) - .await?; + sender.send(MessageToDrone::AckEvent { + event_id: backend_event.event_id, + })?; } MessageFromDrone::AckAction { action_id } => { controller @@ -96,9 +94,7 @@ pub async fn handle_message_from_drone( deadlines: Some(deadlines), }; - sender - .send(MessageToDrone::RenewKeyResponse(renew_key_response)) - .await?; + sender.send(MessageToDrone::RenewKeyResponse(renew_key_response))?; } } @@ -154,7 +150,7 @@ pub async fn process_pending_actions( let mut count = 0; for pending_action in db.backend_actions().pending_actions(*drone_id).await? { let message = MessageToDrone::Action(pending_action); - socket.send(message).await?; + socket.send(message)?; count += 1; } @@ -205,7 +201,7 @@ pub async fn drone_socket_inner( match backend_action_result { Some(backend_action) => { let message = MessageToDrone::Action(backend_action.payload); - if let Err(err) = socket.send(message).await { + if let Err(err) = socket.send(message) { tracing::error!(?err, "Error sending backend action to drone"); } } diff --git a/plane/src/controller/proxy.rs b/plane/src/controller/proxy.rs index b24b6c6f9..26c4e4618 100644 --- a/plane/src/controller/proxy.rs +++ b/plane/src/controller/proxy.rs @@ -37,10 +37,7 @@ pub async fn handle_route_info_request( token, route_info: Some(route_info), }; - if let Err(err) = socket - .send(MessageToProxy::RouteInfoResponse(response)) - .await - { + if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) { tracing::error!(?err, "Error sending route info response to proxy."); } } @@ -59,10 +56,7 @@ pub async fn handle_route_info_request( token, route_info: Some(route_info), }; - if let Err(err) = socket - .send(MessageToProxy::RouteInfoResponse(response)) - .await - { + if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) { tracing::error!(?err, "Error sending route info response to proxy."); } return Ok(()); @@ -72,10 +66,7 @@ pub async fn handle_route_info_request( token, route_info: None, }; - if let Err(err) = socket - .send(MessageToProxy::RouteInfoResponse(response)) - .await - { + if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) { tracing::error!(?err, "Error sending route info response to proxy."); } return Ok(()); @@ -150,10 +141,7 @@ pub async fn handle_route_info_request( token, route_info: None, }; - if let Err(err) = socket - .send(MessageToProxy::RouteInfoResponse(response)) - .await - { + if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) { tracing::error!(?err, "Error sending route info response to proxy."); } } @@ -186,11 +174,9 @@ pub async fn handle_message_from_proxy( "Tried to update keepalive for non-existent backend" ); - socket - .send(MessageToProxy::BackendRemoved { - backend: backend_id, - }) - .await?; + socket.send(MessageToProxy::BackendRemoved { + backend: backend_id, + })?; } Err(err) => { tracing::error!( @@ -255,10 +241,7 @@ pub async fn handle_message_from_proxy( "Sending cert manager response" ); - if let Err(err) = socket - .send(MessageToProxy::CertManagerResponse(response)) - .await - { + if let Err(err) = socket.send(MessageToProxy::CertManagerResponse(response)) { tracing::error!(?err, "Error sending cert manager response to proxy."); } } @@ -307,7 +290,7 @@ pub async fn proxy_socket_inner( continue; } }; - socket.send(MessageToProxy::BackendRemoved { backend: backend_id }).await?; + socket.send(MessageToProxy::BackendRemoved { backend: backend_id })?; }, Some(_) => (), None => { diff --git a/plane/src/dns/mod.rs b/plane/src/dns/mod.rs index e53e0b786..b85f22992 100644 --- a/plane/src/dns/mod.rs +++ b/plane/src/dns/mod.rs @@ -81,7 +81,7 @@ impl AcmeDnsServer { outbound = recv.recv() => { match outbound { Ok(message) => { - if let Err(err) = socket.send(message).await { + if let Err(err) = socket.send(message) { tracing::warn!(?err, "Error sending message to DNS server."); } } diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index 8b23beedb..971e54ee3 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -67,12 +67,10 @@ impl TypedSocketSender { } impl TypedSocket { - pub async fn send(&mut self, message: T) -> Result<(), PlaneClientError> { - if let Err(e) = self.send.send(SocketAction::Send(message)).await { - tracing::error!(?e, "Failed to send message on websocket. Closing receiver."); - self.recv.close(); - return Err(PlaneClientError::SendFailed); - } + pub fn send(&mut self, message: T) -> Result<(), PlaneClientError> { + self.send + .try_send(SocketAction::Send(message)) + .map_err(|_| PlaneClientError::SendFailed)?; Ok(()) } From cd2447ea9ac796d1a8a7e9bea3cf9500b192da2e Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 18:09:19 -0400 Subject: [PATCH 3/6] rename disconnected to clogged --- plane/src/typed_socket/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index 971e54ee3..1d822cde2 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -42,13 +42,13 @@ pub enum TypedSocketError { #[error("Socket closed")] Closed, #[error("Socket disconnected")] - Disconnected, + Clogged, } impl From> for TypedSocketError { fn from(e: TrySendError) -> Self { match e { - TrySendError::Full(_) => Self::Disconnected, + TrySendError::Full(_) => Self::Clogged, TrySendError::Closed(_) => Self::Closed, } } From c81eadd59789288e88aa99b0027366834b53fde4 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 18:19:14 -0400 Subject: [PATCH 4/6] use try_send in socket.next() --- plane/src/typed_socket/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plane/src/typed_socket/client.rs b/plane/src/typed_socket/client.rs index 01045446b..22bdc8727 100644 --- a/plane/src/typed_socket/client.rs +++ b/plane/src/typed_socket/client.rs @@ -154,7 +154,7 @@ async fn new_client( continue; } }; - if let Err(e) = send_to_client.send(result).await { + if let Err(e) = send_to_client.try_send(result) { tracing::error!(%e, "Error sending message."); } } From 29e20712cebafff96403be061086b7efd7bcfa5c Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 18:41:47 -0400 Subject: [PATCH 5/6] clearer error --- plane/src/typed_socket/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index 1d822cde2..abb8d217b 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -41,7 +41,7 @@ impl Debug for TypedSocketSender { pub enum TypedSocketError { #[error("Socket closed")] Closed, - #[error("Socket disconnected")] + #[error("Socket queue full")] Clogged, } From 550fc6f24a1f5a291b73c142c57aae2b95c96cc6 Mon Sep 17 00:00:00 2001 From: Paul Butler Date: Tue, 10 Sep 2024 18:42:30 -0400 Subject: [PATCH 6/6] error descriptions --- plane/src/typed_socket/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plane/src/typed_socket/mod.rs b/plane/src/typed_socket/mod.rs index abb8d217b..e4159a1e3 100644 --- a/plane/src/typed_socket/mod.rs +++ b/plane/src/typed_socket/mod.rs @@ -39,9 +39,9 @@ impl Debug for TypedSocketSender { #[derive(Debug, thiserror::Error)] pub enum TypedSocketError { - #[error("Socket closed")] + #[error("Receiver closed")] Closed, - #[error("Socket queue full")] + #[error("Receiver queue full")] Clogged, }