Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Add create user and login to cluster log #1044 #1374

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ab0e684
wip
michaelvlach Dec 9, 2024
d3fdc2a
fix
michaelvlach Dec 9, 2024
3064d87
separate test
michaelvlach Dec 9, 2024
e5e3a55
Update agdb_server.yaml
michaelvlach Dec 9, 2024
93eede2
Merge branch 'main' into 1044-server-add-create-user-and-login-to-clu…
michaelvlach Dec 9, 2024
06a6394
Update app.rs
michaelvlach Dec 9, 2024
496ef5b
Update routes.rs
michaelvlach Dec 9, 2024
8b263bf
Merge branch '1044-server-add-create-user-and-login-to-cluster-log' o…
michaelvlach Dec 9, 2024
81fb3bd
Update misc_routes.rs
michaelvlach Dec 9, 2024
809c5f6
Update cluster_test.rs
michaelvlach Dec 9, 2024
25f5fd8
Update cluster_test.rs
michaelvlach Dec 9, 2024
536fe4c
Update cluster_test.rs
michaelvlach Dec 9, 2024
9d32c4b
Update cluster_test.rs
michaelvlach Dec 9, 2024
c0b7cb4
Update cluster_test.rs
michaelvlach Dec 9, 2024
cb0292c
Update test_server.rs
michaelvlach Dec 9, 2024
151c9d9
merge tests
michaelvlach Dec 9, 2024
8792f37
Update test_server.rs
michaelvlach Dec 9, 2024
3e11efa
Update test_server.rs
michaelvlach Dec 9, 2024
4f72f4f
Update test_server.rs
michaelvlach Dec 9, 2024
a019fdc
Update test_server.rs
michaelvlach Dec 9, 2024
07ed1b1
Update test_server.rs
michaelvlach Dec 10, 2024
14f925a
Update test_server.rs
michaelvlach Dec 10, 2024
a7a4fbd
swap log levels
michaelvlach Dec 10, 2024
788ade8
debug
michaelvlach Dec 10, 2024
ab52cdb
Update agdb_server.yaml
michaelvlach Dec 10, 2024
afd04c5
Update main.rs
michaelvlach Dec 10, 2024
5787380
Update main.rs
michaelvlach Dec 10, 2024
0fafa1d
Update agdb_server.yaml
michaelvlach Dec 10, 2024
39b1456
Update test_server.rs
michaelvlach Dec 10, 2024
729093a
Update cluster_test.rs
michaelvlach Dec 10, 2024
8bd42ad
Update test_server.rs
michaelvlach Dec 10, 2024
d524242
Update test_server.rs
michaelvlach Dec 10, 2024
87bee3b
Update agdb_server.yaml
michaelvlach Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/agdb_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: taiki-e/install-action@cargo-llvm-cov
- run: rustup component add llvm-tools-preview
- run: cargo llvm-cov --package agdb_server --all-features --no-report --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 23 --show-missing-lines
- run: cargo llvm-cov --package agdb_server --all-features --no-report --ignore-filename-regex "agdb(.|..)src|agdb_derive|agdb_api|api.rs" --fail-uncovered-functions 18 --show-missing-lines

agdb_server_test:
runs-on: ubuntu-latest
Expand Down
36 changes: 36 additions & 0 deletions agdb_server/src/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
pub(crate) mod user_add;

use crate::action::user_add::UserAdd;
use crate::db_pool::DbPool;
use crate::server_db::ServerDb;
use crate::server_error::ServerResult;
use serde::Deserialize;
use serde::Serialize;

#[derive(Clone, Serialize, Deserialize)]
pub(crate) enum ClusterAction {
UserAdd(user_add::UserAdd),
}

#[derive(Clone, Serialize, Deserialize)]
pub(crate) enum ClusterResponse {
None,
}

pub(crate) trait Action: Sized {
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool) -> ServerResult<ClusterResponse>;
}

impl Action for ClusterAction {
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool) -> ServerResult<ClusterResponse> {
match self {
ClusterAction::UserAdd(action) => action.exec(db, db_pool).await,
}
}
}

impl From<UserAdd> for ClusterAction {
fn from(value: UserAdd) -> Self {
ClusterAction::UserAdd(value)
}
}
31 changes: 31 additions & 0 deletions agdb_server/src/action/user_add.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::server_db::ServerUser;
use crate::server_error::ServerResult;
use agdb::UserValue;
use serde::Deserialize;
use serde::Serialize;

#[derive(Clone, Serialize, Deserialize, UserValue)]
pub(crate) struct UserAdd {
pub(crate) user: String,
pub(crate) password: Vec<u8>,
pub(crate) salt: Vec<u8>,
}

impl Action for UserAdd {
async fn exec(self, db: &mut ServerDb, _db_pool: &mut DbPool) -> ServerResult<ClusterResponse> {
db.insert_user(ServerUser {
db_id: None,
username: self.user,
password: self.password,
salt: self.salt,
token: String::new(),
})
.await?;

Ok(ClusterResponse::None)
}
}
5 changes: 5 additions & 0 deletions agdb_server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::cluster::Cluster;
use crate::config::Config;
use crate::db_pool::DbPool;
use crate::logger;
use crate::redirect;
use crate::routes;
use crate::server_db::ServerDb;
use crate::server_state::ServerState;
Expand Down Expand Up @@ -161,6 +162,10 @@ pub(crate) fn app(
let router = Router::new()
.merge(RapiDoc::with_openapi("/api/v1/openapi.json", Api::openapi()).path("/api/v1"))
.nest("/api/v1", api_v1)
.layer(middleware::from_fn_with_state(
state.clone(),
redirect::cluster_redirect,
))
.layer(middleware::from_fn_with_state(
state.clone(),
logger::logger,
Expand Down
103 changes: 72 additions & 31 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::action::Action;
use crate::action::ClusterAction;
use crate::action::ClusterResponse;
use crate::config::Config;
use crate::db_pool::DbPool;
use crate::raft;
use crate::raft::Log;
use crate::raft::Request;
Expand All @@ -23,25 +27,31 @@ use tokio::sync::RwLock;
pub(crate) type Cluster = Arc<ClusterImpl>;

type ClusterNode = Arc<ClusterNodeImpl>;
type ResultNotifier = tokio::sync::oneshot::Sender<ServerResult<ClusterResponse>>;
type ClusterResponseReceiver = UnboundedReceiver<(Request<ClusterAction>, Response)>;

pub(crate) struct ClusterNodeImpl {
client: ReqwestClient,
url: String,
token: Option<String>,
requests_sender: UnboundedSender<Request>,
requests_receiver: RwLock<UnboundedReceiver<Request>>,
responses: UnboundedSender<(Request, Response)>,
requests_sender: UnboundedSender<Request<ClusterAction>>,
requests_receiver: RwLock<UnboundedReceiver<Request<ClusterAction>>>,
responses: UnboundedSender<(Request<ClusterAction>, Response)>,
}

pub(crate) struct ClusterImpl {
pub(crate) index: usize,
pub(crate) nodes: Vec<ClusterNode>,
pub(crate) raft: Arc<RwLock<raft::Cluster<ClusterStorage>>>,
pub(crate) responses: Option<RwLock<UnboundedReceiver<(Request, Response)>>>,
pub(crate) raft: Arc<RwLock<raft::Cluster<ClusterAction, ResultNotifier, ClusterStorage>>>,
pub(crate) responses: Option<RwLock<ClusterResponseReceiver>>,
}

impl ClusterNodeImpl {
fn new(address: &str, token: &str, responses: UnboundedSender<(Request, Response)>) -> Self {
fn new(
address: &str,
token: &str,
responses: UnboundedSender<(Request<ClusterAction>, Response)>,
) -> Self {
let base = if address.starts_with("http") || address.starts_with("https") {
address.to_string()
} else {
Expand All @@ -60,7 +70,7 @@ impl ClusterNodeImpl {
}
}

async fn send(&self, request: &raft::Request) -> Option<raft::Response> {
async fn send(&self, request: &raft::Request<ClusterAction>) -> Option<raft::Response> {
match self
.client
.post(&self.url, &Some(request), &self.token)
Expand All @@ -79,7 +89,7 @@ impl ClusterNodeImpl {
}
}

pub(crate) async fn new(config: &Config, db: &ServerDb) -> ServerResult<Cluster> {
pub(crate) async fn new(config: &Config, db: &ServerDb, db_pool: &DbPool) -> ServerResult<Cluster> {
let index = config
.cluster
.iter()
Expand All @@ -89,7 +99,7 @@ pub(crate) async fn new(config: &Config, db: &ServerDb) -> ServerResult<Cluster>
config.cluster.iter().map(|url| url.to_string()).collect();
sorted_cluster.sort();
let hash = sorted_cluster.stable_hash();
let storage = ClusterStorage::new(db.clone()).await?;
let storage = ClusterStorage::new(db.clone(), db_pool.clone()).await?;
let settings = raft::ClusterSettings {
index: index as u64,
hash,
Expand Down Expand Up @@ -190,15 +200,25 @@ async fn start_cluster(cluster: Cluster, shutdown_signal: Arc<AtomicBool>) -> Se
Ok(())
}

#[allow(dead_code)]
pub(crate) async fn append(cluster: Cluster, data: Vec<u8>) -> ServerResult<()> {
for request in cluster.raft.write().await.append(data).await {
pub(crate) async fn append<T: Action + Into<ClusterAction>>(
cluster: Cluster,
action: T,
) -> ServerResult<ClusterResponse> {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerResult<ClusterResponse>>();
let requests = cluster
.raft
.write()
.await
.append(action.into(), Some(sender))
.await?;

for request in requests {
cluster.nodes[request.target as usize]
.requests_sender
.send(request)?;
}

Ok(())
receiver.await?
}

pub(crate) async fn start_with_shutdown(
Expand All @@ -218,71 +238,92 @@ pub(crate) async fn start_with_shutdown(
}

pub(crate) struct ClusterStorage {
logs: VecDeque<Log>,
logs: VecDeque<(Log<ClusterAction>, Option<ResultNotifier>)>,
index: u64,
term: u64,
commit: u64,
db: ServerDb,
db_pool: DbPool,
}

impl ClusterStorage {
async fn new(db: ServerDb) -> ServerResult<Self> {
async fn new(db: ServerDb, db_pool: DbPool) -> ServerResult<Self> {
let (index, term, commit) = db.cluster_log().await?;
Ok(Self {
logs: VecDeque::new(),
index,
term,
commit,
db,
db_pool,
})
}
}

impl Storage for ClusterStorage {
async fn append(&mut self, log: Log) {
impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
async fn append(&mut self, log: Log<ClusterAction>, notifier: Option<ResultNotifier>) {
if let Some(index) = self
.logs
.iter()
.rev()
.position(|l| l.index == log.index && l.term == log.term)
.position(|(l, _)| l.index == log.index && l.term == log.term)
{
self.logs.truncate(index);
}

self.index = log.index;
self.term = log.term;
self.logs.push_back(log);
self.logs.push_back((log, notifier));
}

async fn commit(&mut self, index: u64) -> ServerResult<()> {
while let Some(log) = self.logs.pop_front() {
if log.index > index {
self.logs.push_front(log);
break;
while let Some((log, _notifier)) = self.logs.front() {
if log.index <= index {
if let Some((log, notifier)) = self.logs.pop_front() {
self.commit = log.index;
self.db.commit_log(&log).await?;

let mut db = self.db.clone();
let mut db_pool = self.db_pool.clone();

tokio::spawn(async move {
let result = log.data.exec(&mut db, &mut db_pool).await;

if let Some(notifier) = notifier {
let _ = notifier.send(result);
}
});
}
} else {
self.commit = log.index;
self.db.commit_log(&log).await?;
//TODO: Execute action
break;
}
}

Ok(())
}

fn current_index(&self) -> u64 {
fn log_index(&self) -> u64 {
self.index
}

fn current_term(&self) -> u64 {
fn log_term(&self) -> u64 {
self.term
}

fn current_commit(&self) -> u64 {
fn log_commit(&self) -> u64 {
self.commit
}

async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log>> {
async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log<ClusterAction>>> {
let mut logs = self.db.logs(since_index).await?;
logs.extend_from_slice(&self.logs.iter().cloned().collect::<Vec<Log>>());
logs.extend_from_slice(
&self
.logs
.iter()
.map(|(log, _)| log)
.cloned()
.collect::<Vec<Log<ClusterAction>>>(),
);
Ok(logs)
}
}
4 changes: 3 additions & 1 deletion agdb_server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod action;
mod api;
mod app;
mod cluster;
Expand All @@ -7,6 +8,7 @@ mod error_code;
mod logger;
mod password;
mod raft;
mod redirect;
mod routes;
mod server_db;
mod server_error;
Expand All @@ -29,8 +31,8 @@ async fn main() -> ServerResult {

let (shutdown_sender, shutdown_receiver) = broadcast::channel::<()>(1);
let server_db = server_db::new(&config).await?;
let cluster = cluster::new(&config, &server_db).await?;
let db_pool = DbPool::new(&config, &server_db).await?;
let cluster = cluster::new(&config, &server_db, &db_pool).await?;

let app = app::app(
cluster.clone(),
Expand Down
Loading
Loading