Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor main loop in raft node of five node example to improve readability #549

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 40 additions & 37 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,51 +64,54 @@ fn main() {
let rx_stop_clone = Arc::clone(&rx_stop);
let logger = logger.clone();
// Here we spawn the node on a new thread and keep a handle so we can join on them later.
let handle = thread::spawn(move || loop {
thread::sleep(Duration::from_millis(10));
let handle = thread::spawn(move || {
// The main loop of the node.
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
thread::sleep(Duration::from_millis(10));
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
}
}
}

let raft_group = match node.raft_group {
Some(ref mut r) => r,
// When Node::raft_group is `None` it means the node is not initialized.
_ => continue,
};
let raft_group = match node.raft_group {
Some(ref mut r) => r,
// When Node::raft_group is `None` it means the node is not initialized.
_ => continue,
};

if t.elapsed() >= Duration::from_millis(100) {
// Tick the raft.
raft_group.tick();
t = Instant::now();
}
if t.elapsed() >= Duration::from_millis(100) {
// Tick the raft.
raft_group.tick();
t = Instant::now();
}

// Let the leader pick pending proposals from the global queue.
if raft_group.raft.state == StateRole::Leader {
// Handle new proposals.
let mut proposals = proposals.lock().unwrap();
for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
propose(raft_group, p);
// Let the leader pick pending proposals from the global queue.
if raft_group.raft.state == StateRole::Leader {
// Handle new proposals.
let mut proposals = proposals.lock().unwrap();
for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
propose(raft_group, p);
}
}
}

// Handle readies from the raft.
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);
// Handle readies from the raft.
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);

// Check control signals from the main thread.
if check_signals(&rx_stop_clone) {
return;
};
// Check control signals from the main thread.
if check_signals(&rx_stop_clone) {
return;
};
}
});
handles.push(handle);
}
Expand Down
24 changes: 12 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ The `Ready` state contains quite a bit of information, and you need to check and
by one:

1. Check whether `messages` is empty or not. If not, it means that the node will send messages to
other nodes:
other nodes:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary space?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary space?

This is a compiler error from CI, so I made this change.

CI output:

error: doc list item missing indentation
   --> src/lib.rs:204:1
    |
204 | other nodes:
    | ^
    |
    = help: if this is supposed to be its own paragraph, add a blank line
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_lazy_continuation
note: the lint level is defined here
   --> src/lib.rs:478:9
    |
478 | #![deny(clippy::all)]
    |         ^^^^^^^^^^^
    = note: `#[deny(clippy::doc_lazy_continuation)]` implied by `#[deny(clippy::all)]`
help: indent this line
    |
204 |    other nodes:
    | +++

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


```rust
# use slog::{Drain, o};
Expand All @@ -226,7 +226,7 @@ other nodes:
```

2. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has received
a Raft snapshot from the leader and we must apply the snapshot:
a Raft snapshot from the leader and we must apply the snapshot:

```rust
# use slog::{Drain, o};
Expand Down Expand Up @@ -254,8 +254,8 @@ a Raft snapshot from the leader and we must apply the snapshot:
```

3. Check whether `committed_entries` is empty or not. If not, it means that there are some newly
committed log entries which you must apply to the state machine. Of course, after applying, you
need to update the applied index and resume `apply` later:
committed log entries which you must apply to the state machine. Of course, after applying, you
need to update the applied index and resume `apply` later:

```rust
# use slog::{Drain, o};
Expand Down Expand Up @@ -310,7 +310,7 @@ need to update the applied index and resume `apply` later:
after restarting, *it may work but potential log loss may also be ignored silently*.

4. Check whether `entries` is empty or not. If not empty, it means that there are newly added
entries but have not been committed yet, we must append the entries to the Raft log:
entries but have not been committed yet, we must append the entries to the Raft log:

```rust
# use slog::{Drain, o};
Expand All @@ -335,8 +335,8 @@ entries but have not been committed yet, we must append the entries to the Raft
```

5. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node has
changed. For example, the node may vote for a new leader, or the commit index has been increased.
We must persist the changed `HardState`:
changed. For example, the node may vote for a new leader, or the commit index has been increased.
We must persist the changed `HardState`:

```rust
# use slog::{Drain, o};
Expand All @@ -360,7 +360,7 @@ We must persist the changed `HardState`:
```

6. Check whether `persisted_messages` is empty or not. If not, it means that the node will send messages to
other nodes after persisting hardstate, entries and snapshot:
other nodes after persisting hardstate, entries and snapshot:

```rust
# use slog::{Drain, o};
Expand All @@ -385,8 +385,8 @@ other nodes after persisting hardstate, entries and snapshot:
```

7. Call `advance` to notify that the previous work is completed. Get the return value `LightReady`
and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then call `advance_apply`
to advance the applied index inside.
and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then call `advance_apply`
to advance the applied index inside.

```rust
# use slog::{Drain, o};
Expand Down Expand Up @@ -470,8 +470,8 @@ This process is a two-phase process, during the midst of it the peer group's lea
**two independent, possibly overlapping peer sets**.

> **Note:** In order to maintain resiliency guarantees (progress while a majority of both peer sets is
active), it is recommended to wait until the entire peer group has exited the transition phase
before taking old, removed peers offline.
> active), it is recommended to wait until the entire peer group has exited the transition phase
> before taking old, removed peers offline.

*/

Expand Down
2 changes: 1 addition & 1 deletion src/quorum/majority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::hash_set::Iter;
use std::fmt::Formatter;
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::{cmp, slice, u64};
use std::{cmp, slice};

/// A set of IDs that uses majority quorums to make decisions.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl Ready {
/// MustSync is false if and only if
/// 1. no HardState or only its commit is different from before
/// 2. no Entries and Snapshot
///
/// If it's false, an asynchronous write of HardState is permissible before calling
/// [`RawNode::on_persist_ready`] or [`RawNode::advance`] or its families.
#[inline]
Expand Down
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod test {
new_entry(5, 5),
new_entry(6, 6),
];
let max_u64 = u64::max_value();
let max_u64 = u64::MAX;
let mut tests = vec![
(
2,
Expand Down
1 change: 0 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

use std::fmt;
use std::fmt::Write;
use std::u64;

use slog::{OwnedKVList, Record, KV};

Expand Down
Loading