Skip to content

Commit

Permalink
Improve typed unix socket logging (#800)
Browse files Browse the repository at this point in the history
Helpful to see the message in context of the error.
  • Loading branch information
michaelsilver authored Aug 14, 2024
1 parent 03df75d commit 25be1ec
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
36 changes: 22 additions & 14 deletions plane/src/typed_unix_socket/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ async fn connect<P: AsRef<Path>>(socket_path: P) -> UnixStream {
);
loop {
match UnixStream::connect(&socket_path).await {
Ok(stream) => return stream,
Ok(stream) => {
tracing::info!(?socket_path, "Connected to server.");
return stream;
}
Err(e) => {
tracing::error!("Error connecting to server: {}", e);
tracing::error!(%e, "Error connecting to server.");
backoff.wait().await;
}
}
Expand Down Expand Up @@ -153,7 +156,7 @@ where
{
Ok(msg) => msg,
Err(e) => {
tracing::error!("Error deserializing message: {}", e);
tracing::error!(%e, ?line, "Error deserializing message.");
continue;
}
};
Expand All @@ -163,16 +166,21 @@ where
message,
} => {
if let Some((_, tx)) = response_map.remove(&id) {
if let Err(e) = tx.send(message) {
tracing::error!("Error sending response: {:?}", e);
if let Err(e) = tx.send(message.clone()) {
// There's no need to log the message as e is the message itself
tracing::error!(?e, "Error sending response.");
}
} else {
tracing::error!("No sender found for response ID: {:?}", id);
tracing::error!(
?id,
msg = ?message,
"No sender found for response ID."
);
}
}
WrappedMessage { id: None, message } => {
if let Err(e) = event_tx.send(message) {
tracing::error!("Error sending event: {}", e);
if let Err(e) = event_tx.send(message.clone()) {
tracing::error!(%e, msg = ?message, "Error sending event.");
}
}
}
Expand All @@ -184,7 +192,7 @@ where
));
}
Err(e) => {
tracing::error!("Error reading line: {}", e);
tracing::error!(%e, "Error reading line.");
return Err(anyhow::anyhow!("Error reading line: {}", e));
}
}
Expand All @@ -201,25 +209,25 @@ where
let msg = match serde_json::to_string(&msg) {
Ok(msg) => msg,
Err(e) => {
tracing::error!("Error serializing message: {}", e);
tracing::error!(%e, ?msg, "Error serializing message.");
continue;
}
};
if let Err(e) = writer.write_all(msg.as_bytes()).await {
tracing::error!("Error writing message: {}", e);
tracing::error!(%e, ?msg, "Error writing message.");
}
if let Err(e) = writer.write_all(b"\n").await {
tracing::error!("Error writing newline: {}", e);
tracing::error!(%e, ?msg, "Error writing newline.");
}
if let Err(e) = writer.flush().await {
tracing::error!("Error flushing writer: {}", e);
tracing::error!(%e, ?msg, "Error flushing writer.");
}
}
Err(RecvError::Closed) => {
break;
}
Err(e) => {
tracing::error!("Error receiving message: {}", e);
tracing::error!(%e, "Error receiving message.");
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions plane/src/typed_unix_socket/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
tracing::error!("Error handling connection");
}
Err(e) => {
tracing::error!("Error accepting connection: {}", e);
tracing::error!(%e, "Error accepting connection.");
}
}
response_rx = response_tx.subscribe();
Expand Down Expand Up @@ -142,19 +142,19 @@ where
{
Ok(msg) => msg,
Err(e) => {
tracing::error!("Error deserializing message: {}", e);
tracing::error!(%e, ?line, "Error deserializing message.");
continue;
}
};
match msg {
WrappedMessage { id: Some(_), .. } => {
if let Err(e) = request_tx.send(msg) {
tracing::error!("Error sending request: {}", e);
if let Err(e) = request_tx.send(msg.clone()) {
tracing::error!(%e, ?msg, "Error sending request.");
}
}
WrappedMessage { id: None, message } => {
if let Err(e) = event_tx.send(message) {
tracing::error!("Error sending event: {}", e);
if let Err(e) = event_tx.send(message.clone()) {
tracing::error!(%e, msg = ?message, "Error sending event.");
}
}
}
Expand All @@ -166,7 +166,7 @@ where
));
}
Err(e) => {
tracing::error!("Error reading line: {}", e);
tracing::error!(%e, "Error reading line.");
return Err(anyhow::anyhow!("Error reading line: {}", e));
}
}
Expand All @@ -184,22 +184,22 @@ where
let response_str = match serde_json::to_string(&response) {
Ok(response_str) => response_str,
Err(e) => {
tracing::error!("Error serializing response: {}", e);
tracing::error!(%e, ?response, "Error serializing response.");
continue;
}
};
if let Err(e) = writer.write_all(response_str.as_bytes()).await {
tracing::error!("Error writing response: {}", e);
tracing::error!(%e, ?response, "Error writing response.");
}
if let Err(e) = writer.write_all(b"\n").await {
tracing::error!("Error writing newline: {}", e);
tracing::error!(%e, ?response, "Error writing newline.");
}
if let Err(e) = writer.flush().await {
tracing::error!("Error flushing writer: {}", e);
tracing::error!(%e, ?response, "Error flushing writer.");
}
}
Err(e) => {
tracing::error!("Error receiving response: {}", e);
tracing::error!(%e, "Error receiving response.");
}
}
}
Expand Down

0 comments on commit 25be1ec

Please sign in to comment.