Skip to content

Commit

Permalink
[server] Fix duplicate log reconciliation messages #1438 (#1441)
Browse files Browse the repository at this point in the history
* Update forward.rs

* Update agdb_server.yaml

* fix conditions
  • Loading branch information
michaelvlach authored Jan 2, 2025
1 parent 0fe7a8a commit db42939
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/agdb_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: taiki-e/install-action@cargo-llvm-cov
- run: rustup component add llvm-tools-preview
- run: cargo llvm-cov --package agdb_server --all-features --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --show-missing-lines
- run: cargo llvm-cov --package agdb_server --all-features --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 17 --show-missing-lines

agdb_server_test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion agdb_server/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) async fn forward_to_leader(
} else {
return Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.body("Cluster is not ready yet".into())
.body("cluster is not ready yet".into())
.expect("cluster not ready yet response");
}
}
Expand Down
61 changes: 36 additions & 25 deletions agdb_server/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,13 @@ impl<T: Clone, N, S: Storage<T, N>> Cluster<T, N, S> {
);

for log in logs {
self.validate_log_append(request, log)?;
self.append_storage(log)
.await
.map_err(|e| self.commit_error(request, e.description))?;
if self.validate_log_append(request, log)? {
self.append_storage(log)
.await
.map_err(|e| self.commit_error(request, e.description))?;
}

if log.index <= request.log_commit {
if log.index <= request.log_commit && self.local().log_commit < log.index {
self.commit_storage(log.index)
.await
.map_err(|e| self.commit_error(request, e.description))?;
Expand Down Expand Up @@ -559,28 +560,38 @@ impl<T: Clone, N, S: Storage<T, N>> Cluster<T, N, S> {
Ok(())
}

fn validate_log_append(&self, request: &Request<T>, log: &Log<T>) -> Result<(), Response> {
if self.local().log_commit >= log.index || log.index > (self.local().log_index + 1) {
return Err(Response {
target: request.index,
result: ResponseType::LogMismatch(LogMismatch {
index: MismatchedValues {
local: Some(self.local().log_index),
requested: Some(log.index),
},
term: MismatchedValues {
local: Some(self.local().log_term),
requested: Some(log.term),
},
commit: MismatchedValues {
local: Some(self.local().log_commit),
requested: Some(log.index),
},
}),
});
fn validate_log_append(&self, request: &Request<T>, log: &Log<T>) -> Result<bool, Response> {
if self.local().log_term == log.term {
if self.local().log_index >= log.index {
return Ok(false);
} else if self.local().log_commit < log.index && self.local().log_index + 1 == log.index
{
return Ok(true);
}
} else if self.local().log_term < log.term
&& self.local().log_commit < log.index
&& self.local().log_index + 1 >= log.index
{
return Ok(true);
}

Ok(())
Err(Response {
target: request.index,
result: ResponseType::LogMismatch(LogMismatch {
index: MismatchedValues {
local: Some(self.local().log_index),
requested: Some(log.index),
},
term: MismatchedValues {
local: Some(self.local().log_term),
requested: Some(log.term),
},
commit: MismatchedValues {
local: Some(self.local().log_commit),
requested: Some(log.index),
},
}),
})
}

fn validate_log_for_vote(&self, request: &Request<T>) -> Result<(), Response> {
Expand Down
2 changes: 1 addition & 1 deletion agdb_server/src/server_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl ServerDb {
let logs: Vec<DbId> = t
.exec(
QueryBuilder::select()
.values(["index", "term"])
.values("index")
.search()
.index(COMMITTED)
.value(false)
Expand Down

0 comments on commit db42939

Please sign in to comment.