Skip to content

Commit

Permalink
Improve typed socket connection robustness (#810)
Browse files Browse the repository at this point in the history
What we know:

- Sometimes, when there are network issues, some (but not all) drones
fail to reconnect. In the logs, it shows up as `failed to send
heartbeat` (from
[heartbeat.rs](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/drone/heartbeat.rs#L19)),
with the `err` value of `Disconnected`
([typed_socket](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/typed_socket/mod.rs#L45)).
- When we “send” websocket messages, we are actually sending messages to
a queue that gets picked up by the websocket event loop asynchronously.
Disconnected [actually means that the message queue is
full](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/typed_socket/mod.rs#L51),
which *implies* we are disconnected, but is not immediate.
- On a `TypedSocket`, `send()` [uses
Sender::send](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/typed_socket/mod.rs#L70-L76),
while `TypedSocketSender`'s `send()` uses
[try_send](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/typed_socket/mod.rs#L92).
- In tokio,
[`send`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.send)
on a full channel blocks until the channel has capacity. `try_send`
returns immediately if the channel is full.

My leading theory is that when the network is interrupted abruptly, upon
reconnecting to the controller, the controller sends a bunch of
messages, causing a deadlock:

- the `new_client` loop is stalled waiting for capacity in
send_to_client, which won't happen until `socket.recv()` [is
called](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/drone/mod.rs#L96-L99)
in the main drone event loop
- the main drone event loop is waiting on a call to `socket.send()` when
[acking an
action](https://github.com/jamsocket/plane/blob/afc9b7f0786f69770fb9fe4b9731fde566dc793d/plane/src/drone/mod.rs#L142)

This PR introduces several changes, which should improve the robustness
of reconnects:

- Instead of handling messages from the controller directly in the drone
event loop, they are sent to separate tasks. This means that nothing can
get in the way of the drone event loop's ability to call
`socket.recv()`.
- `TypedSocket::send` now uses `Sender::try_send`, for consistency with
`TypedSocketSender::send`. Note that as a result of moving handling out
of the drone event loop, messages sent from the drone loop now use
`TypedSocketSender` instead of `TypedSocket` anyway, so those messages
would now use `try_send` regardless.
- As a result of using `try_send`, `TypedSocket::send` no longer needs
to be async, so that's removed.
- `TypedSocketError::Disconnected` is renamed to
`TypedSocketError::Clogged`, to better reflect what the issue is.
  • Loading branch information
paulgb authored Sep 10, 2024
1 parent afc9b7f commit 3c4cdbd
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 126 deletions.
2 changes: 0 additions & 2 deletions plane/plane-tests/tests/backend_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) {
.send(MessageFromDrone::Heartbeat(Heartbeat {
local_time: LoggableTime(Utc::now()),
}))
.await
.unwrap();

// Wait for the drone to be registered.
Expand Down Expand Up @@ -132,7 +131,6 @@ async fn backend_action_resent_if_not_acked(env: TestEnvironment) {

drone_connection
.send(MessageFromDrone::AckAction { action_id })
.await
.unwrap();

// Drone connections should always be closed to prevent a warning, but this
Expand Down
2 changes: 0 additions & 2 deletions plane/plane-tests/tests/backend_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ async fn backend_lifecycle(env: TestEnvironment) {
.send(MessageFromProxy::RouteInfoRequest(RouteInfoRequest {
token: response.token.clone(),
}))
.await
.unwrap();

let result = proxy.recv().with_timeout(10).await.unwrap().unwrap();
Expand Down Expand Up @@ -179,7 +178,6 @@ async fn backend_lifecycle(env: TestEnvironment) {
tracing::info!("Sending keepalive.");
proxy
.send(MessageFromProxy::KeepAlive(response.backend_id.clone()))
.await
.unwrap();

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Expand Down
3 changes: 0 additions & 3 deletions plane/plane-tests/tests/dns_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ async fn dns_api(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -47,7 +46,6 @@ async fn dns_api(env: TestEnvironment) {
txt_value: "foobaz".to_string(),
},
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::SetTxtRecordResponse {
Expand All @@ -61,7 +59,6 @@ async fn dns_api(env: TestEnvironment) {
.send(MessageFromDns::TxtRecordRequest {
cluster: env.cluster.clone(),
})
.await
.unwrap();

let MessageToDns::TxtRecordResponse { cluster, txt_value } = dns_client.recv().await.unwrap();
Expand Down
5 changes: 0 additions & 5 deletions plane/plane-tests/tests/proxy_cert_lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ async fn request_dns_lease(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -47,7 +46,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -68,7 +66,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand All @@ -82,7 +79,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::ReleaseCertLease,
))
.await
.unwrap();

// Avoid race.
Expand All @@ -92,7 +88,6 @@ async fn request_dns_lease_fails_when_held(env: TestEnvironment) {
.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await
.unwrap();

let MessageToProxy::CertManagerResponse(CertManagerResponse::CertLeaseResponse {
Expand Down
6 changes: 2 additions & 4 deletions plane/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE

conn.send(MessageFromProxy::CertManagerRequest(
CertManagerRequest::CertLeaseRequest,
))
.await?;
))?;

let response = conn.recv().await.expect("Failed to receive response");

Expand All @@ -346,8 +345,7 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
CertManagerRequest::SetTxtRecord {
txt_value: message.clone(),
},
))
.await?;
))?;

let response = conn.recv().await.expect("Failed to receive response");

Expand Down
2 changes: 1 addition & 1 deletion plane/src/controller/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn dns_socket_inner(
let message = MessageToDns::TxtRecordResponse { cluster, txt_value };
tracing::info!(?message, "Sending txt record response to drone.");

if let Err(err) = socket.send(message).await {
if let Err(err) = socket.send(message) {
tracing::error!(?err, "Error sending txt record response to drone.");
}
}
Expand Down
16 changes: 6 additions & 10 deletions plane/src/controller/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ pub async fn handle_message_from_drone(
.update_state(&backend_event.backend_id, backend_event.state)
.await?;

sender
.send(MessageToDrone::AckEvent {
event_id: backend_event.event_id,
})
.await?;
sender.send(MessageToDrone::AckEvent {
event_id: backend_event.event_id,
})?;
}
MessageFromDrone::AckAction { action_id } => {
controller
Expand Down Expand Up @@ -96,9 +94,7 @@ pub async fn handle_message_from_drone(
deadlines: Some(deadlines),
};

sender
.send(MessageToDrone::RenewKeyResponse(renew_key_response))
.await?;
sender.send(MessageToDrone::RenewKeyResponse(renew_key_response))?;
}
}

Expand Down Expand Up @@ -154,7 +150,7 @@ pub async fn process_pending_actions(
let mut count = 0;
for pending_action in db.backend_actions().pending_actions(*drone_id).await? {
let message = MessageToDrone::Action(pending_action);
socket.send(message).await?;
socket.send(message)?;
count += 1;
}

Expand Down Expand Up @@ -205,7 +201,7 @@ pub async fn drone_socket_inner(
match backend_action_result {
Some(backend_action) => {
let message = MessageToDrone::Action(backend_action.payload);
if let Err(err) = socket.send(message).await {
if let Err(err) = socket.send(message) {
tracing::error!(?err, "Error sending backend action to drone");
}
}
Expand Down
35 changes: 9 additions & 26 deletions plane/src/controller/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ pub async fn handle_route_info_request(
token,
route_info: Some(route_info),
};
if let Err(err) = socket
.send(MessageToProxy::RouteInfoResponse(response))
.await
{
if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) {
tracing::error!(?err, "Error sending route info response to proxy.");
}
}
Expand All @@ -59,10 +56,7 @@ pub async fn handle_route_info_request(
token,
route_info: Some(route_info),
};
if let Err(err) = socket
.send(MessageToProxy::RouteInfoResponse(response))
.await
{
if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) {
tracing::error!(?err, "Error sending route info response to proxy.");
}
return Ok(());
Expand All @@ -72,10 +66,7 @@ pub async fn handle_route_info_request(
token,
route_info: None,
};
if let Err(err) = socket
.send(MessageToProxy::RouteInfoResponse(response))
.await
{
if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) {
tracing::error!(?err, "Error sending route info response to proxy.");
}
return Ok(());
Expand Down Expand Up @@ -150,10 +141,7 @@ pub async fn handle_route_info_request(
token,
route_info: None,
};
if let Err(err) = socket
.send(MessageToProxy::RouteInfoResponse(response))
.await
{
if let Err(err) = socket.send(MessageToProxy::RouteInfoResponse(response)) {
tracing::error!(?err, "Error sending route info response to proxy.");
}
}
Expand Down Expand Up @@ -186,11 +174,9 @@ pub async fn handle_message_from_proxy(
"Tried to update keepalive for non-existent backend"
);

socket
.send(MessageToProxy::BackendRemoved {
backend: backend_id,
})
.await?;
socket.send(MessageToProxy::BackendRemoved {
backend: backend_id,
})?;
}
Err(err) => {
tracing::error!(
Expand Down Expand Up @@ -255,10 +241,7 @@ pub async fn handle_message_from_proxy(
"Sending cert manager response"
);

if let Err(err) = socket
.send(MessageToProxy::CertManagerResponse(response))
.await
{
if let Err(err) = socket.send(MessageToProxy::CertManagerResponse(response)) {
tracing::error!(?err, "Error sending cert manager response to proxy.");
}
}
Expand Down Expand Up @@ -307,7 +290,7 @@ pub async fn proxy_socket_inner(
continue;
}
};
socket.send(MessageToProxy::BackendRemoved { backend: backend_id }).await?;
socket.send(MessageToProxy::BackendRemoved { backend: backend_id })?;
},
Some(_) => (),
None => {
Expand Down
2 changes: 1 addition & 1 deletion plane/src/dns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl AcmeDnsServer {
outbound = recv.recv() => {
match outbound {
Ok(message) => {
if let Err(err) = socket.send(message).await {
if let Err(err) = socket.send(message) {
tracing::warn!(?err, "Error sending message to DNS server.");
}
}
Expand Down
Loading

0 comments on commit 3c4cdbd

Please sign in to comment.