Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] catch panics in handlers #2442

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions rust/worker/src/system/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ impl ChromaError for ChannelError {
}
}

#[derive(Error, Debug)]
pub enum ChannelRequestError {
#[derive(Error, Debug, PartialEq)]
pub enum RequestError {
#[error("Failed to send request")]
RequestError,
SendError,
#[error("Failed to receive response")]
ResponseError,
ReceiveError,
#[error("Message handler panicked")]
HandlerPanic(Option<String>),
HammadB marked this conversation as resolved.
Show resolved Hide resolved
}

impl ChromaError for ChannelRequestError {
impl ChromaError for RequestError {
fn code(&self) -> ErrorCodes {
ErrorCodes::Internal
}
Expand Down
22 changes: 22 additions & 0 deletions rust/worker/src/system/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ where

#[cfg(test)]
mod tests {
use crate::system::RequestError;

use super::*;
use async_trait::async_trait;

Expand Down Expand Up @@ -155,6 +157,10 @@ mod tests {
message: usize,
_ctx: &ComponentContext<TestComponent>,
) -> Self::Result {
if message == 0 {
panic!("Invalid input");
}

self.counter += message;
return self.counter;
}
Expand All @@ -180,4 +186,20 @@ mod tests {
assert_eq!(1, handle.request(1, None).await.unwrap());
assert_eq!(2, handle.request(1, None).await.unwrap());
}

#[tokio::test]
async fn catches_panic() {
let system = System::new();
let component = TestComponent::new(10);
let handle = system.start_component(component);

let err = handle.request(0, None).await.unwrap_err();
assert_eq!(
RequestError::HandlerPanic(Some("Invalid input".to_string())),
err
);

// Component is still alive
assert_eq!(1, handle.request(1, None).await.unwrap());
}
}
20 changes: 14 additions & 6 deletions rust/worker/src/system/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{scheduler::Scheduler, ChannelError, ChannelRequestError, WrappedMessage};
use super::{
scheduler::Scheduler, ChannelError, MessageHandlerError, RequestError, WrappedMessage,
};
use async_trait::async_trait;
use core::panic;
use futures::Stream;
Expand Down Expand Up @@ -141,7 +143,7 @@ impl<C: Component> ComponentSender<C> {
&self,
message: M,
tracing_context: Option<tracing::Span>,
) -> Result<C::Result, ChannelRequestError>
) -> Result<C::Result, RequestError>
where
C: Handler<M>,
M: Message,
Expand All @@ -150,10 +152,16 @@ impl<C: Component> ComponentSender<C> {
self.sender
.send(WrappedMessage::new(message, Some(tx), tracing_context))
.await
.map_err(|_| ChannelRequestError::RequestError)?;
.map_err(|_| RequestError::SendError)?;

let result = rx.await.map_err(|_| ChannelRequestError::ResponseError)?;
Ok(result)
let result = rx.await.map_err(|_| RequestError::ReceiveError)?;

match result {
Ok(result) => Ok(result),
Err(err) => match err {
MessageHandlerError::Panic(p) => Err(RequestError::HandlerPanic(p)),
},
}
}
}

Expand Down Expand Up @@ -253,7 +261,7 @@ impl<C: Component> ComponentHandle<C> {
&self,
message: M,
tracing_context: Option<tracing::Span>,
) -> Result<C::Result, ChannelRequestError>
) -> Result<C::Result, RequestError>
where
C: Handler<M>,
M: Message,
Expand Down
54 changes: 45 additions & 9 deletions rust/worker/src/system/wrapped_message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::{Component, ComponentContext, Handler, Message};
use async_trait::async_trait;
use std::fmt::Debug;
use futures::FutureExt;
use std::{fmt::Debug, panic::AssertUnwindSafe};
use thiserror::Error;
use tokio::sync::oneshot;

// Why is this separate from the WrappedMessage struct? WrappedMessage is only generic
Expand All @@ -24,6 +26,14 @@ impl<M: Message, Result: Send> HandleableMessageImpl<M, Result> {
}
}

#[derive(Debug, Error)]
pub(super) enum MessageHandlerError {
#[error("Panic occurred while handling message: {0:?}")]
Panic(Option<String>),
}

type MessageHandlerWrappedResult<R> = Result<R, MessageHandlerError>;

/// Erases the type of the message so it can be sent over a channel and optionally bundles a tracing context.
#[derive(Debug)]
pub(crate) struct WrappedMessage<C>
Expand All @@ -37,7 +47,7 @@ where
impl<C: Component> WrappedMessage<C> {
pub(super) fn new<M>(
message: M,
reply_channel: Option<oneshot::Sender<C::Result>>,
reply_channel: Option<oneshot::Sender<MessageHandlerWrappedResult<C::Result>>>,
tracing_context: Option<tracing::Span>,
) -> Self
where
Expand Down Expand Up @@ -68,19 +78,45 @@ where
}

#[async_trait]
impl<C, M> HandleableMessage<C> for Option<HandleableMessageImpl<M, C::Result>>
impl<C, M> HandleableMessage<C>
for Option<HandleableMessageImpl<M, MessageHandlerWrappedResult<C::Result>>>
where
C: Component + Handler<M>,
M: Message,
{
async fn handle_and_reply(&mut self, component: &mut C, ctx: &ComponentContext<C>) -> () {
if let Some(message) = self.take() {
let result = component.handle(message.message, ctx).await;
if let Some(reply_channel) = message.reply_channel {
reply_channel
.send(result)
.expect("message reply channel was unexpectedly dropped by caller");
}
let result = AssertUnwindSafe(component.handle(message.message, ctx))
.catch_unwind()
.await;

match result {
Ok(result) => {
if let Some(reply_channel) = message.reply_channel {
reply_channel
.send(Ok(result))
.expect("message reply channel was unexpectedly dropped by caller");
}
}
Err(panic_value) => {
#[allow(clippy::manual_map)]
let panic_value = if let Some(s) = panic_value.downcast_ref::<&str>() {
Some(&**s)
} else if let Some(s) = panic_value.downcast_ref::<String>() {
Some(s.as_str())
} else {
None
};

if let Some(reply_channel) = message.reply_channel {
reply_channel
.send(Err(MessageHandlerError::Panic(
panic_value.map(ToString::to_string),
)))
.expect("message reply channel was unexpectedly dropped by caller");
}
}
};
}
}
}
Loading