Skip to content

Commit

Permalink
Clear pools of devices before retrying them
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Sep 16, 2024
1 parent 761ca3c commit 00016f9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl std::fmt::Display for LoginError {

/// Perform a health check with a timeout of 1 second
fn health_check_get_timeout(instance: &InstanceData) -> bool {
instance.config.client.clear_pool();
let config = &instance.config;
let uri_str = format!("{}/health/ready", config.base_path);
let mut req = config.client.get(&uri_str).timeout(Duration::from_secs(1));
Expand Down
1 change: 1 addition & 0 deletions pkcs11/src/config/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ fn background_timer(
fn background_thread(rx: mpsc::Receiver<InstanceData>) -> impl FnOnce() {
move || loop {
while let Ok(instance) = rx.recv() {
instance.config.client.clear_pool();
match health_ready_get(&instance.config) {
Ok(_) => instance.clear_failed(),
Err(_) => instance.bump_failed(),
Expand Down
62 changes: 41 additions & 21 deletions pkcs11/tests/tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,35 +197,54 @@ impl TestContext {
}
}

impl Drop for TestDropper {
fn drop(&mut self) {
impl TestDropper {
fn clear(&mut self) {
for p in self.context.blocked_ports.iter().cloned() {
TestContext::unblock(p);
}
println!("Finished unblocking ports");
}
}

static PROXY_SENDER: LazyLock<UnboundedSender<(u16, u16, broadcast::Sender<()>)>> =
LazyLock::new(|| {
let (tx, mut rx) = unbounded_channel();
std::thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut tasks = Vec::new();
while let Some((from_port, to_port, sender)) = rx.recv().await {
tasks.push(tokio::spawn(proxy(from_port, to_port, sender)));
}
for task in tasks {
task.abort();
impl Drop for TestDropper {
fn drop(&mut self) {
self.clear();
}
}

enum ProxyMessage {
NewProxy(u16, u16, broadcast::Sender<()>),
CloseAll,
}

static PROXY_SENDER: LazyLock<UnboundedSender<ProxyMessage>> = LazyLock::new(|| {
let (tx, mut rx) = unbounded_channel();
std::thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut tasks = Vec::new();
while let Some(msg) = rx.recv().await {
match msg {
ProxyMessage::NewProxy(from_port, to_port, sender) => {
tasks.push(tokio::spawn(proxy(from_port, to_port, sender)))
}
ProxyMessage::CloseAll => {
for task in mem::take(&mut tasks) {
task.abort();
}
}
}
})
});
tx
}
for task in tasks {
task.abort();
}
})
});
tx
});

async fn proxy(from_port: u16, to_port: u16, stall_sender: broadcast::Sender<()>) {
let listener = TcpListener::bind(((Ipv4Addr::from([127, 0, 0, 1])), from_port))
Expand Down Expand Up @@ -368,7 +387,7 @@ pub fn run_tests(

for (in_port, out_port) in proxies {
PROXY_SENDER
.send((
.send(ProxyMessage::NewProxy(
*in_port,
*out_port,
test_dropper.context.stall_connections.clone(),
Expand All @@ -384,5 +403,6 @@ pub fn run_tests(
let mut ctx = Ctx::new_and_initialize("../target/release/libnethsm_pkcs11.so").unwrap();
f(&mut test_dropper.context, &mut ctx);
ctx.close_all_sessions(0).unwrap();
PROXY_SENDER.send(ProxyMessage::CloseAll).unwrap();
println!("Ending test");
}

0 comments on commit 00016f9

Please sign in to comment.