Skip to content

Commit

Permalink
Fix: flaky test: update replication metrics only when the replication…
Browse files Browse the repository at this point in the history
… task stopped

Otherwise, the test script believes replication is shut down then
restore the network communication, there will be unexpected logs sent to
a follower hense the test fails: `stop_replication_to_removed_unreachable_follower_network_failure`.
  • Loading branch information
drmingdrmer committed Jun 4, 2022
1 parent dc71966 commit 797fb9b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 15 deletions.
18 changes: 14 additions & 4 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
///
/// This is ony called by leader.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) fn handle_uniform_consensus_committed(&mut self, log_id: &LogId<C::NodeId>) {
pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId<C::NodeId>) {
let index = log_id.index;

// Step down if needed.
Expand Down Expand Up @@ -313,7 +313,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

let targets = self.nodes.keys().cloned().collect::<Vec<_>>();
for target in targets {
self.try_remove_replication(target);
self.try_remove_replication(target).await;
}

self.core.engine.metrics_flags.set_replication_changed();
Expand All @@ -323,7 +323,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
///
/// Return true if removed.
#[tracing::instrument(level = "trace", skip(self))]
pub fn try_remove_replication(&mut self, target: C::NodeId) -> bool {
pub async fn try_remove_replication(&mut self, target: C::NodeId) -> bool {
tracing::debug!(target = display(target), "try_remove_replication");

{
Expand All @@ -340,7 +340,17 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

tracing::info!("removed replication to: {}", target);
self.nodes.remove(&target);
let repl_state = self.nodes.remove(&target);
if let Some(s) = repl_state {
let handle = s.repl_stream.handle;

// Drop sender to notify the task to shutdown
drop(s.repl_stream.repl_tx);

tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
tracing::info!("Done joining removed replication : {}", target);
}

self.replication_metrics.update(RemoveTarget { target });
// TODO(xp): set_replication_metrics_changed() can be removed.
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
);
}

pub fn handle_special_log(&mut self, entry: &Entry<C>) {
pub async fn handle_special_log(&mut self, entry: &Entry<C>) {
match &entry.payload {
EntryPayload::Membership(ref m) => {
if m.is_in_joint_consensus() {
// nothing to do
} else {
self.handle_uniform_consensus_committed(&entry.log_id);
self.handle_uniform_consensus_committed(&entry.log_id).await;
}
}
EntryPayload::Blank => {}
Expand All @@ -234,7 +234,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
&mut self,
entry: &Entry<C>,
) -> Result<C::R, StorageError<C::NodeId>> {
self.handle_special_log(entry);
self.handle_special_log(entry).await;

// First, we just ensure that we apply any outstanding up to, but not including, the index
// of the given entry. We need to be able to return the data response from applying this
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
Err(_err_str) => {
state.failures += 1;

self.try_remove_replication(target);
self.try_remove_replication(target).await;
return Ok(());
}
};
Expand All @@ -148,7 +148,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

// Drop replication stream if needed.
if self.try_remove_replication(target) {
if self.try_remove_replication(target).await {
// nothing to do
} else {
self.update_replication_metrics(target, matched);
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::interval;
use tokio::time::timeout;
use tokio::time::Duration;
Expand Down Expand Up @@ -50,7 +51,8 @@ use crate::Vote;
/// The public handle to a spawned replication stream.
pub(crate) struct ReplicationStream<NID: NodeId> {
/// The spawn handle the `ReplicationCore` task.
// pub handle: JoinHandle<()>,
pub handle: JoinHandle<()>,

/// The channel used for communicating with the replication task.
pub repl_tx: mpsc::UnboundedSender<(RaftEvent<NID>, Span)>,
}
Expand Down Expand Up @@ -183,12 +185,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
need_to_replicate: true,
};

let _handle = tokio::spawn(this.main().instrument(tracing::trace_span!("spawn").or_current()));
let handle = tokio::spawn(this.main().instrument(tracing::trace_span!("repl-stream").or_current()));

ReplicationStream {
// handle,
repl_tx,
}
ReplicationStream { handle, repl_tx }
}

#[tracing::instrument(level="trace", skip(self), fields(vote=%self.vote, target=display(self.target), cluster=%self.config.cluster_name))]
Expand Down Expand Up @@ -691,6 +690,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
);

let res = self.send_append_entries().await;
tracing::debug!(target = display(self.target), res = debug(&res), "replication res",);

if let Err(err) = res {
tracing::error!(error=%err, "error replication to target={}", self.target);
Expand Down

0 comments on commit 797fb9b

Please sign in to comment.