Skip to content

Commit

Permalink
feat(examples): add QUIC KV server
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Nov 7, 2024
1 parent a00f4a7 commit cdc4cdd
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 27 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions examples/rust/wasi-keyvalue-quic-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "wasi-keyvalue-quic-server"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
quinn = { workspace = true, features = [
"log",
"platform-verifier",
"ring",
"runtime-tokio",
"rustls",
] }
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
rustls = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
wrpc-transport = { workspace = true }
wrpc-transport-quic = { workspace = true }
wrpc-wasi-keyvalue = { workspace = true }
wrpc-wasi-keyvalue-mem = { workspace = true }
147 changes: 147 additions & 0 deletions examples/rust/wasi-keyvalue-quic-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use core::net::SocketAddr;
use core::pin::pin;

use std::sync::Arc;

use anyhow::Context as _;
use clap::Parser;
use futures::stream::select_all;
use futures::StreamExt as _;
use quinn::crypto::rustls::QuicServerConfig;
use quinn::{Endpoint, ServerConfig};
use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use rustls::version::TLS13;
use tokio::task::JoinSet;
use tokio::{select, signal};
use tracing::{debug, error, info, warn};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to serve `wasi:keyvalue` on
#[arg(default_value = "[::1]:4433")]
addr: SocketAddr,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();

let Args { addr } = Args::parse();

let CertifiedKey { cert, key_pair } = generate_simple_self_signed([
"localhost".to_string(),
"::1".to_string(),
"127.0.0.1".to_string(),
])
.context("failed to generate server certificate")?;
let cert = CertificateDer::from(cert);

let conf = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
.with_no_client_auth() // TODO: verify client cert
.with_single_cert(
vec![cert],
PrivatePkcs8KeyDer::from(key_pair.serialize_der()).into(),
)
.context("failed to create server config")?;

let conf: QuicServerConfig = conf
.try_into()
.context("failed to convert rustls client config to QUIC server config")?;

let ep = Endpoint::server(ServerConfig::with_crypto(Arc::new(conf)), addr)
.context("failed to create server endpoint")?;

let srv = Arc::new(wrpc_transport_quic::Server::new());
let accept = tokio::spawn({
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
let srv = Arc::clone(&srv);
async move {
loop {
select! {
Some(conn) = ep.accept() => {
let srv = Arc::clone(&srv);
tasks.spawn(async move {
let conn = conn
.accept()
.context("failed to accept QUIC connection")?;
let conn = conn.await.context("failed to establish QUIC connection")?;
let wrpc = wrpc_transport_quic::Client::from(conn);
loop {
srv.accept(&wrpc)
.await
.context("failed to accept wRPC connection")?;
}
});
}
Some(res) = tasks.join_next() => {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!(?err, "failed to serve connection")
}
Err(err) => {
error!(?err, "failed to join task")
}
}
}
else => {
return;
}
}
}
}
});

let invocations =
wrpc_wasi_keyvalue::serve(srv.as_ref(), wrpc_wasi_keyvalue_mem::Handler::default())
.await
.context("failed to serve `wasi:keyvalue`")?;
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
// to customize this, iterate over the returned `invocations` and set up custom handling per export
let mut invocations = select_all(
invocations
.into_iter()
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
);
let shutdown = signal::ctrl_c();
let mut shutdown = pin!(shutdown);
let mut tasks = JoinSet::new();
loop {
select! {
Some((instance, name, res)) = invocations.next() => {
match res {
Ok(fut) => {
debug!(instance, name, "invocation accepted");
tasks.spawn(async move {
if let Err(err) = fut.await {
warn!(?err, "failed to handle invocation");
} else {
info!(instance, name, "invocation successfully handled");
}
});
}
Err(err) => {
warn!(?err, instance, name, "failed to accept invocation");
}
}
}
Some(res) = tasks.join_next() => {
if let Err(err) = res {
error!(?err, "failed to join task")
}
}
res = &mut shutdown => {
accept.abort();
// wait for all invocations to complete
while let Some(res) = tasks.join_next().await {
if let Err(err) = res {
error!(?err, "failed to join task")
}
}
return res.context("failed to listen for ^C")
}
}
}
}
1 change: 1 addition & 0 deletions examples/rust/wasi-keyvalue-tcp-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::pin::pin;

use std::sync::Arc;

use anyhow::Context as _;
Expand Down
1 change: 1 addition & 0 deletions examples/rust/wasi-keyvalue-unix-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::pin::pin;

use std::path::PathBuf;
use std::sync::Arc;

Expand Down
5 changes: 2 additions & 3 deletions examples/web/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
Ok(ep) => ep,
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
};
let Some(mut san) = url.path().strip_prefix('/') else {
return Ok(Err(store::Error::Other("invalid URL".to_string())));
};
let san = url.path();
let mut san = san.strip_prefix('/').unwrap_or(san);
if san.is_empty() {
san = "localhost"
}
Expand Down
66 changes: 42 additions & 24 deletions examples/web/ui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@
}
identifier = url;
} else if (conn === 'quic') {
let addr = obj.addr;
let addr = obj['quic-addr'];
if (addr == null || addr === '') {
dbg.error('QUIC address must be set');
return
}
identifier = 'wrpc+quic://' + identifier;
identifier = 'wrpc+quic://' + addr;

let bucket = obj.bucket;
let bucket = obj['quic-bucket'];
if (bucket != null && bucket != '') {
identifier = identifier + ';' + bucket;
}
Expand All @@ -131,7 +131,7 @@
}
identifier = 'wrpc+tcp://' + addr;

let bucket = obj.bucket;
let bucket = obj['tcp-bucket'];
if (bucket != null && bucket != '') {
identifier = identifier + ';' + bucket;
}
Expand All @@ -148,14 +148,14 @@
identifier = identifier + ';' + bucket;
}
} else if (conn === 'web') {
let addr = obj.addr;
let addr = obj['web-addr'];
if (addr == null || addr === '') {
dbg.error('WebTransport address must be set');
return
}
identifier = 'wrpc+web://' + addr;

let bucket = obj.bucket;
let bucket = obj['web-bucket'];
if (bucket != null && bucket != '') {
identifier = identifier + ';' + bucket;
}
Expand Down Expand Up @@ -205,6 +205,10 @@
}

async function handleGet() {
/** @type {HTMLInputElement | null} */
const getValue = document.querySelector('#get input[name="get-value"]');
if (getValue) getValue.value = null;

if (!transport) {
dbg.error('transport not connected')
return
Expand Down Expand Up @@ -260,8 +264,6 @@

dbg.info(`got value from bucket: ${bucketName}`, JSON.stringify(value, null, 2));

/** @type {HTMLInputElement | null} */
const getValue = document.querySelector('#get input[name="get-value"]');
if (getValue) getValue.value = value;
}

Expand Down Expand Up @@ -423,22 +425,7 @@ <h1 class="title">wRPC Transports</h1>
<form id="settings">
<div id="template-output"></div>

<template class="form-fields" data-option="default">
<div class="field">
<label class="label">Bucket identifier</label>
<div class="control">
<input class="input" type="text" name="bucket" />
</div>
</div>
<div class="field">
<label class="label">Target address</label>
<div class="control">
<input class="input" type="text" name="addr" placeholder="localhost:1234" />
</div>
</div>
</template>

<template class="form-fields" data-option="mem"></template>
<template class="form-fields" data-option="default"></template>

<template class="form-fields" data-option="redis">
<div class="field">
Expand Down Expand Up @@ -472,6 +459,21 @@ <h1 class="title">wRPC Transports</h1>
</div>
</template>

<template class="form-fields" data-option="quic">
<div class="field">
<label class="label">Bucket identifier</label>
<div class="control">
<input class="input" type="text" name="quic-bucket" />
</div>
</div>
<div class="field">
<label class="label">QUIC socket address</label>
<div class="control">
<input class="input" type="text" name="quic-addr" placeholder="[::1]:4433" value="[::1]:4433" />
</div>
</div>
</template>

<template class="form-fields" data-option="tcp">
<div class="field">
<label class="label">Bucket identifier</label>
Expand Down Expand Up @@ -502,6 +504,22 @@ <h1 class="title">wRPC Transports</h1>
</div>
</div>
</template>

<template class="form-fields" data-option="web">
<div class="field">
<label class="label">Bucket identifier</label>
<div class="control">
<input class="input" type="text" name="web-bucket" />
</div>
</div>
<div class="field">
<label class="label">WebTransport address</label>
<div class="control">
<input class="input" type="text" name="web-addr" placeholder="localhost:4433"
value="localhost:4433" />
</div>
</div>
</template>
</form>
</section>

Expand Down

0 comments on commit cdc4cdd

Please sign in to comment.