Skip to content

Commit

Permalink
Slight cleanup of processing function, ability to generate & send mea…
Browse files Browse the repository at this point in the history
…surements from csv using test client in one command
  • Loading branch information
DJAndries committed Jun 20, 2024
1 parent 7f91108 commit 96967a3
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 54 deletions.
6 changes: 4 additions & 2 deletions misc/test-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use star_constellation::api::*;
use star_constellation::randomness::testing::LocalFetcher;
use std::fs::remove_file;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -167,6 +168,8 @@ async fn gen_msgs_from_data_and_save(csv_path: &str, cli_args: &CliArgs) {

let mut new_path = PathBuf::from(csv_path);
new_path.set_extension("b64l");
remove_file(&new_path).ok();

let file = Arc::new(Mutex::new(
OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -318,12 +321,11 @@ async fn main() {
if let Some(gen_data_file) = cli_args.gen_data_file.as_ref() {
println!("Generating messages from data file...");
gen_msgs_from_data_and_save(gen_data_file, &cli_args).await;
return;
}

if let Some(messages_file) = cli_args.messages_file.as_ref() {
send_messages_from_file(&cli_args, messages_file).await;
} else {
} else if cli_args.gen_data_file.is_none() {
println!("Generating random messages...");
send_random_messages(&cli_args).await;
}
Expand Down
129 changes: 77 additions & 52 deletions src/aggregator/processing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::group::GroupedMessages;
use super::group::{GroupedMessages, MessageChunk};
use super::recovered::RecoveredMessages;
use super::report::report_measurements;
use super::AggregatorError;
Expand All @@ -10,6 +10,7 @@ use crate::models::{
use crate::profiler::{Profiler, ProfilerStat};
use crate::record_stream::{DynRecordStream, RecordStreamArc};
use crate::star::{recover_key, recover_msgs, AppSTARError, MsgRecoveryInfo};
use star_constellation::api::NestedMessage;
use star_constellation::Error as ConstellationError;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -48,6 +49,68 @@ pub async fn process_expired_epochs(
Ok(())
}

fn drain_chunk_messages_for_threshold(
chunk: &mut MessageChunk,
threshold: usize,
) -> Result<Vec<NestedMessage>, AggregatorError> {
let mut msgs = Vec::new();
if let Some(new_msgs) = chunk.new_msgs.get_mut(&threshold) {
msgs.append(new_msgs);
}
if let Some(pending_msgs) = chunk.pending_msgs.get_mut(&threshold) {
for pending_msg in pending_msgs.drain(..) {
msgs.push(pending_msg.try_into()?);
}
}
Ok(msgs)
}

/// Returns None if key recovery process failed, which indicates
/// that the chunk messages should be stored for a future
/// recovery attempt. Returns Some with a tuple containing the key,
/// and the messages drained from the chunk that were used for recovery, if any.
fn get_recovery_key(
epoch: u8,
chunk: &mut MessageChunk,
recovery_threshold: Option<usize>,
existing_rec_msg: Option<&&mut RecoveredMessage>,
) -> Result<Option<(Vec<u8>, Option<Vec<NestedMessage>>)>, AggregatorError> {
let mut key_recovery_msgs: Option<Vec<_>> = None;

// if a recovered msg exists, use the key that was already recovered.
// otherwise, recover the key
let key = if let Some(rec_msg) = existing_rec_msg {
rec_msg.key.clone()
} else {
let threshold = recovery_threshold.unwrap();
let new_msg_count = chunk.new_msgs.len();

// drain messages required for recovery into the vec
let mut msgs = drain_chunk_messages_for_threshold(chunk, threshold)?;

let key = match recover_key(&msgs, epoch, threshold) {
Err(e) => {
match e {
AppSTARError::Recovery(ConstellationError::ShareRecovery) => {
// Store new messages until we receive more shares in the future.
for msg in msgs.drain(..new_msg_count) {
chunk.new_msgs.get_mut(&threshold).unwrap().push(msg);
}
return Ok(None);
}
_ => return Err(e.into()),
};
}
Ok(key) => key,
};
// cache the messages used for key recovery, so they can be used
// for measurement recovery
key_recovery_msgs = Some(msgs);
key
};
Ok(Some((key, key_recovery_msgs)))
}

fn process_one_layer(
grouped_msgs: &mut GroupedMessages,
rec_msgs: &mut RecoveredMessages,
Expand All @@ -71,71 +134,33 @@ fn process_one_layer(

let has_pending_msgs = chunk.pending_msgs.values().any(|v| !v.is_empty());

// create vec to concat new messages from kafka, and pending messages from PG
// for key recovery
let mut key_recovery_msgs: Option<Vec<_>> = None;

// if a recovered msg exists, use the key that was already recovered.
// otherwise, recover the key
let key = if let Some(rec_msg) = existing_rec_msg.as_ref() {
rec_msg.key.clone()
} else {
let threshold = recovery_threshold.unwrap();
let new_msg_count = chunk.new_msgs.len();

let mut msgs = Vec::new();
// drain messages required for recovery into the vec
msgs.append(chunk.new_msgs.get_mut(&threshold).unwrap());
if let Some(pending_msgs) = chunk.pending_msgs.get_mut(&threshold) {
for pending_msg in pending_msgs.drain(..) {
msgs.push(pending_msg.try_into()?);
let (key, mut key_recovery_msgs) =
match get_recovery_key(*epoch, chunk, recovery_threshold, existing_rec_msg.as_ref())? {
Some(res) => res,
None => {
// key recovery failed. stop processing for the current message chunk/tag,
// save messages in db for later attempt
continue;
}
}

let key = match recover_key(&msgs, *epoch, threshold) {
Err(e) => {
match e {
AppSTARError::Recovery(ConstellationError::ShareRecovery) => {
// Store new messages until we receive more shares in the future.
for msg in msgs.drain(..new_msg_count) {
chunk.new_msgs.get_mut(&threshold).unwrap().push(msg);
}
continue;
}
_ => return Err(e.into()),
};
}
Ok(key) => key,
};
// cache the messages used for key recovery, so they can be used
// for measurement recovery
key_recovery_msgs = Some(msgs);
key
};

let mut msgs_len = 0i64;
let mut metric_name: Option<String> = None;
let mut metric_value: Option<String> = None;
let mut has_children = false;

let mut thresholds = HashSet::new();
thresholds.extend(chunk.new_msgs.keys());
thresholds.extend(chunk.pending_msgs.keys());
thresholds.extend(chunk.new_msgs.keys().chain(chunk.pending_msgs.keys()));

// recover each k-threshold group separately so we store
// new nested pending messages with the correct threshold value
for threshold in thresholds {
let msgs = if recovery_threshold == Some(threshold) && key_recovery_msgs.is_some() {
// messages for this threshold were already drained in the key
// recovery step, so use this existing vec
key_recovery_msgs.take().unwrap()
} else {
let mut msgs = Vec::new();
if let Some(new_msgs) = chunk.new_msgs.get_mut(&threshold) {
msgs.append(new_msgs);
}
if let Some(pending_msgs) = chunk.pending_msgs.get_mut(&threshold) {
for pending_msg in pending_msgs.drain(..) {
msgs.push(pending_msg.try_into()?);
}
}
msgs
drain_chunk_messages_for_threshold(chunk, threshold)?
};

if msgs.is_empty() {
Expand Down

0 comments on commit 96967a3

Please sign in to comment.