Skip to content

Commit

Permalink
feat(runtime)!: make actors abortable from init (#279)
Browse files Browse the repository at this point in the history
* Avoid adding the shutdown handle to actors until init is completed. Move INX connection retry logic to init. Better status error handling.

* Cleanup

* Config comments
  • Loading branch information
Alexandcoats authored Jun 20, 2022
1 parent 9b9e442 commit 3784e7d
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 66 deletions.
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Actor for ApiWorker {
.with_graceful_shutdown(shutdown_signal(receiver))
.await;
// If the Axum server shuts down, we should also shutdown the API actor
api_handle.shutdown();
api_handle.shutdown().await;
res
});
self.server_handle = Some((join_handle, sender));
Expand Down
40 changes: 16 additions & 24 deletions bin/inx-chronicle/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,9 @@ pub enum LauncherError {
/// Supervisor actor
pub struct Launcher;

pub struct LauncherState {
config: ChronicleConfig,
#[allow(dead_code)]
db: MongoDb,
}

#[async_trait]
impl Actor for Launcher {
type State = LauncherState;
type State = ChronicleConfig;
type Error = LauncherError;

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
Expand Down Expand Up @@ -70,7 +64,7 @@ impl Actor for Launcher {
cx.spawn_child(super::metrics::MetricsWorker::new(&db, &config.metrics))
.await;

Ok(LauncherState { config, db })
Ok(config)
}

fn name(&self) -> std::borrow::Cow<'static, str> {
Expand All @@ -85,25 +79,15 @@ impl HandleEvent<Report<super::stardust_inx::InxWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::stardust_inx::InxWorker>,
LauncherState { config, db }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
use chronicle::runtime::SpawnActor;

use super::stardust_inx::InxError;
match event {
Report::Success(_) => {
cx.abort().await;
}
Report::Error(report) => match report.error {
ActorError::Result(e) => match e {
InxError::ConnectionError => {
log::warn!(
"INX connection failed. Retrying in {}s.",
config.inx.connection_retry_interval.as_secs()
);
let inx_worker = super::stardust_inx::InxWorker::new(db, &config.inx);
cx.delay(SpawnActor::new(inx_worker), config.inx.connection_retry_interval)?;
}
InxError::MongoDb(e) => match e.kind.as_ref() {
// Only a few possible errors we could potentially recover from
mongodb::error::ErrorKind::Io(_)
Expand All @@ -116,9 +100,17 @@ impl HandleEvent<Report<super::stardust_inx::InxWorker>> for Launcher {
cx.abort().await;
}
},
InxError::Read(_) => {
cx.spawn_child(report.actor).await;
}
InxError::Read(e) => match e.code() {
inx::tonic::Code::DeadlineExceeded
| inx::tonic::Code::ResourceExhausted
| inx::tonic::Code::Aborted
| inx::tonic::Code::Unavailable => {
cx.spawn_child(report.actor).await;
}
_ => {
cx.abort().await;
}
},
_ => {
cx.abort().await;
}
Expand All @@ -139,7 +131,7 @@ impl HandleEvent<Report<super::api::ApiWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::api::ApiWorker>,
LauncherState { config, .. }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
match event {
Report::Success(_) => {
Expand All @@ -166,7 +158,7 @@ impl HandleEvent<Report<super::metrics::MetricsWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::metrics::MetricsWorker>,
LauncherState { config, .. }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
match event {
Report::Success(_) => {
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Actor for MetricsWorker {
.await;

// Stop the actor if the server stops.
metrics_handle.shutdown();
metrics_handle.shutdown().await;

res.unwrap()
})
Expand Down
7 changes: 5 additions & 2 deletions bin/inx-chronicle/src/stardust_inx/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::time::Duration;
use chronicle::types::tangle::MilestoneIndex;
use serde::{Deserialize, Serialize};

/// A builder to establish a connection to INX.
#[must_use]
/// Configuration for an INX connection.
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct InxConfig {
Expand All @@ -16,6 +15,9 @@ pub struct InxConfig {
/// The time that has to pass until a new connection attempt is made.
#[serde(with = "humantime_serde")]
pub connection_retry_interval: Duration,
/// The number of retries when connecting fails.
pub connection_retry_count: usize,
/// The milestone at which synchronization should begin.
pub sync_start_milestone: MilestoneIndex,
}

Expand All @@ -24,6 +26,7 @@ impl Default for InxConfig {
Self {
connect_url: "http://localhost:9029".into(),
connection_retry_interval: Duration::from_secs(5),
connection_retry_count: 5,
sync_start_milestone: 1.into(),
}
}
Expand Down
17 changes: 14 additions & 3 deletions bin/inx-chronicle/src/stardust_inx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,20 @@ impl InxWorker {
return Err(InxError::InvalidAddress(inx_config.connect_url.clone()));
}

InxClient::connect(inx_config.connect_url.clone())
.await
.map_err(|_| InxError::ConnectionError)
for i in 0..inx_config.connection_retry_count {
match InxClient::connect(inx_config.connect_url.clone()).await {
Ok(inx_client) => return Ok(inx_client),
Err(_) => {
log::warn!(
"INX connection failed. Retrying in {}s. {} retries remaining.",
inx_config.connection_retry_interval.as_secs(),
inx_config.connection_retry_count - i
);
tokio::time::sleep(inx_config.connection_retry_interval).await;
}
}
}
Err(InxError::ConnectionError)
}

async fn spawn_syncer(
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/stardust_inx/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl HandleEvent<SyncNext> for Syncer {
.await;
} else {
log::info!("Successfully finished synchronization with node.");
cx.shutdown();
cx.shutdown().await;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/actor/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl<A: Actor> Addr<A> {
}

/// Shuts down the actor. Use with care!
pub fn shutdown(&self) {
self.scope.shutdown();
pub async fn shutdown(&self) {
self.scope.shutdown().await;
}

/// Aborts the actor. Use with care!
Expand Down
9 changes: 8 additions & 1 deletion src/runtime/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ use super::{
Actor,
};
use crate::runtime::{
config::SpawnConfig, error::RuntimeError, scope::RuntimeScope, shutdown::ShutdownStream, Sender, Task, TaskReport,
config::SpawnConfig,
error::RuntimeError,
scope::RuntimeScope,
shutdown::{ShutdownHandle, ShutdownStream},
Sender, Task, TaskReport,
};

type Receiver<A> = ShutdownStream<EnvelopeStream<A>>;
Expand Down Expand Up @@ -95,10 +99,13 @@ impl<A: Actor> ActorContext<A> {
actor: &mut A,
actor_state: &mut Option<A::State>,
abort_reg: AbortRegistration,
shutdown_handle: ShutdownHandle,
) -> Result<Result<Result<(), A::Error>, Box<dyn Any + Send>>, Aborted> {
let res = Abortable::new(
AssertUnwindSafe(async {
let mut state = actor.init(self).await?;
// Set the shutdown handle before starting the event loop.
self.scope.0.set_shutdown_handle(shutdown_handle).await;
// Call handle events until shutdown
let res = actor.run(self, &mut state).await;
let res = actor.shutdown(self, &mut state, res).await;
Expand Down
28 changes: 14 additions & 14 deletions src/runtime/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub(crate) struct Scope {
pub(crate) struct ScopeInner {
pub(crate) id: ScopeId,
address_registry: RwLock<AddressRegistry>,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
shutdown_handle: RwLock<Option<ShutdownHandle>>,
abort_handle: AbortHandle,
parent: Option<Scope>,
children: RwLock<HashMap<ScopeId, Scope>>,
}
Expand All @@ -58,7 +58,7 @@ impl Scope {
id: ROOT_SCOPE,
address_registry: Default::default(),
shutdown_handle: Default::default(),
abort_handle: Some(abort_handle),
abort_handle,
parent: None,
children: Default::default(),
}),
Expand All @@ -68,19 +68,15 @@ impl Scope {
}
}

pub(crate) async fn child(
&self,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
) -> Self {
pub(crate) async fn child(&self, abort_handle: AbortHandle) -> Self {
log::trace!("Adding child to {:x}", self.id.as_fields().0);
let id = Uuid::new_v4();
let parent = self.clone();
let child = Scope {
inner: Arc::new(ScopeInner {
id,
address_registry: Default::default(),
shutdown_handle,
shutdown_handle: Default::default(),
abort_handle,
parent: Some(parent),
children: Default::default(),
Expand All @@ -94,6 +90,10 @@ impl Scope {
child
}

pub(crate) async fn set_shutdown_handle(&self, handle: ShutdownHandle) {
self.inner.shutdown_handle.write().await.replace(handle);
}

/// Finds a scope by id.
pub(crate) fn find(&self, id: ScopeId) -> Option<&Scope> {
if id == self.id {
Expand Down Expand Up @@ -136,13 +136,13 @@ impl Scope {
log::trace!("Dropped scope {:x}", self.id.as_fields().0);
}

pub(crate) fn shutdown(&self) {
pub(crate) async fn shutdown(&self) {
log::trace!("Shutting down scope {:x}", self.id.as_fields().0);
self.valid.store(false, Ordering::Release);
if let Some(handle) = self.shutdown_handle.as_ref() {
if let Some(handle) = self.shutdown_handle.read().await.as_ref() {
handle.shutdown();
} else if let Some(abort) = self.abort_handle.as_ref() {
abort.abort();
} else {
self.abort_handle.abort();
}
log::trace!("Shut down scope {:x}", self.id.as_fields().0);
}
Expand All @@ -155,7 +155,7 @@ impl Scope {
for child_scope in children {
child_scope.abort().await;
}
self.shutdown();
self.shutdown().await;
log::trace!("Aborted scope {:x}", self.id.as_fields().0);
}

Expand Down
30 changes: 13 additions & 17 deletions src/runtime/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl ScopeView {
}

/// Shuts down the scope.
pub fn shutdown(&self) {
self.0.shutdown();
pub async fn shutdown(&self) {
self.0.shutdown().await;
}

/// Aborts the tasks in this runtime's scope. This will shutdown tasks that have
Expand All @@ -115,13 +115,9 @@ impl RuntimeScope {
}
}

pub(crate) async fn child(
&self,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
) -> Self {
pub(crate) async fn child(&self, abort_handle: AbortHandle) -> Self {
Self {
scope: ScopeView(self.scope.0.child(shutdown_handle, abort_handle).await),
scope: ScopeView(self.scope.0.child(abort_handle).await),
join_handles: Default::default(),
}
}
Expand All @@ -140,7 +136,7 @@ impl RuntimeScope {
F: Future<Output = Result<O, Box<dyn Error + Send + Sync>>>,
{
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut child_scope = self.child(None, Some(abort_handle)).await;
let mut child_scope = self.child(abort_handle).await;
let res = Abortable::new(f(&mut child_scope), abort_registration).await;
if let Ok(Err(_)) = res {
child_scope.abort().await;
Expand Down Expand Up @@ -168,7 +164,7 @@ impl RuntimeScope {
stream,
add_to_registry,
}: SpawnConfigInner<A>,
) -> (Addr<A>, ActorContext<A>, AbortRegistration)
) -> (Addr<A>, ActorContext<A>, AbortRegistration, ShutdownHandle)
where
A: 'static + Actor,
{
Expand Down Expand Up @@ -208,14 +204,14 @@ impl RuntimeScope {
let (receiver, shutdown_handle) = ShutdownStream::new(Box::new(receiver) as _);
(receiver, shutdown_handle)
};
let scope = self.child(Some(shutdown_handle), Some(abort_handle)).await;
let scope = self.child(abort_handle).await;
let handle = Addr::new(scope.scope.clone(), sender);
if add_to_registry {
self.scope.0.insert_addr(handle.clone()).await;
}
let cx = ActorContext::new(scope, handle.clone(), receiver);
log::debug!("Initializing {}", actor.name());
(handle, cx, abort_reg)
(handle, cx, abort_reg, shutdown_handle)
}

/// Spawns a new, plain task.
Expand All @@ -225,7 +221,7 @@ impl RuntimeScope {
Sup: 'static + HandleEvent<TaskReport<T>>,
{
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut child_scope = self.child(None, Some(abort_handle.clone())).await;
let mut child_scope = self.child(abort_handle.clone()).await;
let child_task = spawn_task(task.name().as_ref(), async move {
let fut = task.run();
let res = Abortable::new(AssertUnwindSafe(fut).catch_unwind(), abort_registration).await;
Expand Down Expand Up @@ -269,10 +265,10 @@ impl RuntimeScope {
Cfg: Into<SpawnConfig<A>>,
{
let SpawnConfig { mut actor, config } = actor.into();
let (handle, mut cx, abort_reg) = self.common_spawn(&actor, config).await;
let (handle, mut cx, abort_reg, shutdown_handle) = self.common_spawn(&actor, config).await;
let child_task = spawn_task(actor.name().as_ref(), async move {
let mut data = None;
let res = cx.start(&mut actor, &mut data, abort_reg).await;
let res = cx.start(&mut actor, &mut data, abort_reg, shutdown_handle).await;
match res {
Ok(res) => match res {
Ok(res) => match res {
Expand Down Expand Up @@ -313,10 +309,10 @@ impl RuntimeScope {
Cfg: Into<SpawnConfig<A>>,
{
let SpawnConfig { mut actor, config } = actor.into();
let (handle, mut cx, abort_reg) = self.common_spawn(&actor, config).await;
let (handle, mut cx, abort_reg, shutdown_handle) = self.common_spawn(&actor, config).await;
let child_task = spawn_task(actor.name().as_ref(), async move {
let mut data = None;
let res = cx.start(&mut actor, &mut data, abort_reg).await;
let res = cx.start(&mut actor, &mut data, abort_reg, shutdown_handle).await;
match res {
Ok(res) => match res {
Ok(res) => match res {
Expand Down

0 comments on commit 3784e7d

Please sign in to comment.