diff --git a/runtimes/Cargo.toml b/runtimes/Cargo.toml index 025d17a42..dd0d90a14 100644 --- a/runtimes/Cargo.toml +++ b/runtimes/Cargo.toml @@ -2,5 +2,6 @@ members = [ "legacy", "next", + "proto", "wasm" ] diff --git a/runtimes/legacy/Cargo.toml b/runtimes/legacy/Cargo.toml index 20f0833a3..7626dab6d 100644 --- a/runtimes/legacy/Cargo.toml +++ b/runtimes/legacy/Cargo.toml @@ -6,6 +6,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.62" async-trait = "0.1.58" clap ={ version = "4.0.18", features = ["derive"] } thiserror = "1.0.37" @@ -18,6 +19,10 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } version = "0.7.0" path = "../../common" +[dependencies.shuttle-runtime-proto] +version = "0.1.0" +path = "../proto" + [dependencies.shuttle-service] version = "0.7.0" default-features = false diff --git a/runtimes/legacy/README.md b/runtimes/legacy/README.md index 4915ab01f..1f9d0b8be 100644 --- a/runtimes/legacy/README.md +++ b/runtimes/legacy/README.md @@ -2,8 +2,15 @@ Load and run an .so library that implements `shuttle_service::Service`. -To load and run, pass the path to the .so file to load as an argument to the shuttle-next binary: +To test, first start this binary using: ```bash -cargo run -- -f "src/libhello_world.so" +cargo run -- +``` + +Then in another shell, load a `.so` file and start it up: + +``` bash +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "../../examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start ``` diff --git a/runtimes/legacy/src/args.rs b/runtimes/legacy/src/args.rs index f35e2b96c..2a2455e3a 100644 --- a/runtimes/legacy/src/args.rs +++ b/runtimes/legacy/src/args.rs @@ -3,10 +3,6 @@ use tonic::transport::Endpoint; #[derive(Parser, Debug)] pub struct Args { - /// Uri to the `.so` file to load - #[arg(long, short)] - pub file_path: String, - /// Address to reach provisioner at #[clap(long, default_value = "localhost:5000")] pub provisioner_address: Endpoint, diff --git a/runtimes/legacy/src/error.rs b/runtimes/legacy/src/error.rs index 919dab37d..9c57cd4e2 100644 --- a/runtimes/legacy/src/error.rs +++ b/runtimes/legacy/src/error.rs @@ -7,6 +7,8 @@ pub enum Error { Load(#[from] LoaderError), #[error("Run error: {0}")] Run(#[from] shuttle_service::Error), + #[error("Start error: {0}")] + Start(#[from] shuttle_service::error::CustomError), } pub type Result = std::result::Result; diff --git a/runtimes/legacy/src/lib.rs b/runtimes/legacy/src/lib.rs index 177d34b24..1cc4e3ad7 100644 --- a/runtimes/legacy/src/lib.rs +++ b/runtimes/legacy/src/lib.rs @@ -1,2 +1,151 @@ +use std::{ + collections::BTreeMap, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, + str::FromStr, + sync::Mutex, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use shuttle_common::{database, LogItem}; +use shuttle_runtime_proto::runtime::{ + runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse, +}; +use shuttle_service::{ + loader::{LoadedService, Loader}, + Factory, Logger, ServiceName, +}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tonic::{Request, Response, Status}; +use tracing::{info, instrument, trace}; + pub mod args; pub mod error; + +pub struct Legacy { + // Mutexes are for interior mutability + so_path: Mutex>, + port: Mutex>, +} + +impl Legacy { + pub fn new() -> Self { + Self { + so_path: Mutex::new(None), + port: Mutex::new(None), + } + } +} + +#[async_trait] +impl Runtime for Legacy { + async fn load(&self, request: Request) -> Result, Status> { + let so_path = request.into_inner().path; + trace!(so_path, "loading"); + + let so_path = PathBuf::from(so_path); + *self.so_path.lock().unwrap() = Some(so_path); + + let message = LoadResponse { success: true }; + Ok(Response::new(message)) + } + + async fn start( + &self, + _request: Request, + ) -> Result, Status> { + let port = 8001; + let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); + let mut factory = DummyFactory::new(); + let (logger, _rx) = get_logger(); + let so_path = self + .so_path + .lock() + .unwrap() + .as_ref() + .ok_or_else(|| -> error::Error { + error::Error::Start(anyhow!("trying to start a service that was not loaded")) + }) + .map_err(|err| Status::from_error(Box::new(err)))? + .clone(); + + trace!(%address, "starting"); + let service = load_service(address, so_path, &mut factory, logger) + .await + .unwrap(); + + _ = tokio::spawn(run(service, address)); + + *self.port.lock().unwrap() = Some(port); + + let message = StartResponse { + success: true, + port: Some(port as u32), + }; + + Ok(Response::new(message)) + } +} + +#[instrument(skip(service))] +async fn run(service: LoadedService, addr: SocketAddr) { + let (handle, library) = service; + + info!("starting deployment on {}", addr); + handle.await.unwrap().unwrap(); + + tokio::spawn(async move { + trace!("closing .so file"); + library.close().unwrap(); + }); +} + +#[instrument(skip(addr, so_path, factory, logger))] +async fn load_service( + addr: SocketAddr, + so_path: PathBuf, + factory: &mut dyn Factory, + logger: Logger, +) -> error::Result { + let loader = Loader::from_so_file(so_path)?; + + Ok(loader.load(factory, addr, logger).await?) +} + +struct DummyFactory { + service_name: ServiceName, +} + +impl DummyFactory { + fn new() -> Self { + Self { + service_name: ServiceName::from_str("legacy").unwrap(), + } + } +} + +#[async_trait] +impl Factory for DummyFactory { + fn get_service_name(&self) -> ServiceName { + self.service_name.clone() + } + + async fn get_db_connection_string( + &mut self, + _: database::Type, + ) -> Result { + todo!() + } + + async fn get_secrets(&mut self) -> Result, shuttle_service::Error> { + todo!() + } +} + +fn get_logger() -> (Logger, UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let logger = Logger::new(tx, Default::default()); + + (logger, rx) +} diff --git a/runtimes/legacy/src/main.rs b/runtimes/legacy/src/main.rs index 07c308d32..0a9e381e7 100644 --- a/runtimes/legacy/src/main.rs +++ b/runtimes/legacy/src/main.rs @@ -1,15 +1,10 @@ -use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf, str::FromStr}; +use std::net::{Ipv4Addr, SocketAddr}; -use async_trait::async_trait; use clap::Parser; -use shuttle_common::{database, LogItem}; -use shuttle_legacy::args::Args; -use shuttle_service::{ - loader::{LoadedService, Loader}, - Factory, Logger, ServiceName, -}; -use tokio::sync::mpsc::{self, UnboundedReceiver}; -use tracing::{info, instrument, trace}; +use shuttle_legacy::{args::Args, Legacy}; +use shuttle_runtime_proto::runtime::runtime_server::RuntimeServer; +use tonic::transport::Server; +use tracing::trace; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main(flavor = "multi_thread")] @@ -28,76 +23,11 @@ async fn main() { trace!(args = ?args, "parsed args"); - let address: SocketAddr = "127.0.0.1:8000".parse().unwrap(); - let mut factory = DummyFactory::new(); - let (logger, _rx) = get_logger(); - let so_path = PathBuf::from(args.file_path.as_str()); - - let service = load_service(address, so_path, &mut factory, logger) + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000); + let legacy = Legacy::new(); + Server::builder() + .add_service(RuntimeServer::new(legacy)) + .serve(addr) .await .unwrap(); - - _ = tokio::spawn(run(service, address)).await; -} - -#[instrument(skip(service))] -async fn run(service: LoadedService, addr: SocketAddr) { - let (handle, library) = service; - - info!("starting deployment on {}", addr); - handle.await.unwrap().unwrap(); - - tokio::spawn(async move { - trace!("closing .so file"); - library.close().unwrap(); - }); -} - -#[instrument(skip(addr, so_path, factory, logger))] -async fn load_service( - addr: SocketAddr, - so_path: PathBuf, - factory: &mut dyn Factory, - logger: Logger, -) -> shuttle_legacy::error::Result { - let loader = Loader::from_so_file(so_path)?; - - Ok(loader.load(factory, addr, logger).await?) -} - -struct DummyFactory { - service_name: ServiceName, -} - -impl DummyFactory { - fn new() -> Self { - Self { - service_name: ServiceName::from_str("next").unwrap(), - } - } -} - -#[async_trait] -impl Factory for DummyFactory { - fn get_service_name(&self) -> ServiceName { - self.service_name.clone() - } - - async fn get_db_connection_string( - &mut self, - _: database::Type, - ) -> Result { - todo!() - } - - async fn get_secrets(&mut self) -> Result, shuttle_service::Error> { - todo!() - } -} - -fn get_logger() -> (Logger, UnboundedReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - let logger = Logger::new(tx, Default::default()); - - (logger, rx) } diff --git a/runtimes/next/Cargo.toml b/runtimes/next/Cargo.toml index 924cbad3c..326402a04 100644 --- a/runtimes/next/Cargo.toml +++ b/runtimes/next/Cargo.toml @@ -13,6 +13,8 @@ async-trait = "0.1.58" clap ={ version = "4.0.18", features = ["derive"] } tokio = { version = "1.20.1", features = [ "full" ] } tonic = "0.8.0" +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } cap-std = "*" wasmtime = "*" @@ -20,3 +22,7 @@ wasmtime-wasi = "*" wasi-common = "*" serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } + +[dependencies.shuttle-runtime-proto] +version = "0.1.0" +path = "../proto" diff --git a/runtimes/next/README.md b/runtimes/next/README.md index d0da79c83..c247e00fc 100644 --- a/runtimes/next/README.md +++ b/runtimes/next/README.md @@ -4,7 +4,14 @@ ```bash $ cd ..; make wasm -$ DISCORD_TOKEN=xxx BOT_SRC=bot.wasm cargo run +$ DISCORD_TOKEN=xxx cargo run +``` + +In another terminal: + +``` bash +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "bot.wasm"}' localhost:8000 runtime.Runtime/load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start ``` ## Running the tests diff --git a/runtimes/next/src/lib.rs b/runtimes/next/src/lib.rs index cce82eb43..273a2eea0 100644 --- a/runtimes/next/src/lib.rs +++ b/runtimes/next/src/lib.rs @@ -1,5 +1,6 @@ pub mod args; +use std::env; use std::fs::File; use std::io::{Read, Write}; use std::os::unix::prelude::RawFd; @@ -7,16 +8,71 @@ use std::path::Path; use std::sync::Arc; use async_trait::async_trait; - -use serenity::{model::prelude::*, prelude::*}; - use cap_std::os::unix::net::UnixStream; +use serenity::{model::prelude::*, prelude::*}; +use shuttle_runtime_proto::runtime::runtime_server::Runtime; +use shuttle_runtime_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; +use tonic::{Request, Response, Status}; +use tracing::trace; use wasi_common::file::FileCaps; use wasmtime::{Engine, Linker, Module, Store}; use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; -pub struct BotBuilder { +pub struct Next { + bot: std::sync::Mutex>, +} + +impl Next { + pub fn new() -> Self { + Self { + bot: std::sync::Mutex::new(None), + } + } +} + +#[async_trait] +impl Runtime for Next { + async fn load(&self, request: Request) -> Result, Status> { + let wasm_path = request.into_inner().path; + trace!(wasm_path, "loading"); + + let bot = Bot::new(wasm_path); + + *self.bot.lock().unwrap() = Some(bot); + + let message = LoadResponse { success: true }; + + Ok(Response::new(message)) + } + + async fn start( + &self, + _request: Request, + ) -> Result, Status> { + let intents = GatewayIntents::GUILD_MESSAGES | GatewayIntents::MESSAGE_CONTENT; + let token = env::var("DISCORD_TOKEN").unwrap(); + let bot: Bot = { + let guard = self.bot.lock().unwrap(); + guard.as_ref().unwrap().clone() + }; + let mut client = bot.into_client(token.as_str(), intents).await; + + trace!("starting bot"); + tokio::spawn(async move { + client.start().await.unwrap(); + }); + + let message = StartResponse { + success: true, + port: None, + }; + + Ok(Response::new(message)) + } +} + +struct BotBuilder { engine: Engine, store: Store, linker: Linker, @@ -71,7 +127,7 @@ impl BotBuilder { } } -pub struct BotInner { +struct BotInner { store: Store, linker: Linker, } @@ -110,7 +166,8 @@ impl BotInner { } } -pub struct Bot { +#[derive(Clone)] +struct Bot { inner: Arc>, } diff --git a/runtimes/next/src/main.rs b/runtimes/next/src/main.rs index 62a00847d..6a2f11848 100644 --- a/runtimes/next/src/main.rs +++ b/runtimes/next/src/main.rs @@ -1,20 +1,33 @@ +use std::net::{Ipv4Addr, SocketAddr}; + use clap::Parser; -use serenity::prelude::*; -use shuttle_next::{args::Args, Bot}; -use std::env; -use std::io; +use shuttle_next::{args::Args, Next}; +use shuttle_runtime_proto::runtime::runtime_server::RuntimeServer; +use tonic::transport::Server; +use tracing::trace; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main] -async fn main() -> io::Result<()> { - let _args = Args::parse(); +async fn main() { + let args = Args::parse(); - let intents = GatewayIntents::GUILD_MESSAGES | GatewayIntents::MESSAGE_CONTENT; + let fmt_layer = fmt::layer(); + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); - let token = env::var("DISCORD_TOKEN").unwrap(); - let src = env::var("BOT_SRC").unwrap(); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); - let mut client = Bot::new(src).into_client(token.as_str(), intents).await; - client.start().await.unwrap(); + trace!(args = ?args, "parsed args"); - Ok(()) + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000); + let next = Next::new(); + Server::builder() + .add_service(RuntimeServer::new(next)) + .serve(addr) + .await + .unwrap(); } diff --git a/runtimes/proto/Cargo.toml b/runtimes/proto/Cargo.toml new file mode 100644 index 000000000..5e255f65d --- /dev/null +++ b/runtimes/proto/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "shuttle-runtime-proto" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +prost = "0.11.0" +tonic = "0.8.0" + +[build-dependencies] +tonic-build = "0.8.0" diff --git a/runtimes/proto/build.rs b/runtimes/proto/build.rs new file mode 100644 index 000000000..a045f1d57 --- /dev/null +++ b/runtimes/proto/build.rs @@ -0,0 +1,5 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../proto/runtime.proto")?; + + Ok(()) +} diff --git a/runtimes/proto/runtime.proto b/runtimes/proto/runtime.proto new file mode 100644 index 000000000..d26e0c6c8 --- /dev/null +++ b/runtimes/proto/runtime.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +package runtime; + +service Runtime { + // Load a service file to be ready to start it + rpc load(LoadRequest) returns (LoadResponse); + + // Start a loaded service file + rpc start(StartRequest) returns (StartResponse); +} + +message LoadRequest { + // Name of service to load + string service_name = 1; + + // Path to compiled file to load for service + string path = 2; +} + +message LoadResponse { + // Could the service be loaded + bool success = 1; +} + +message StartRequest { + // Name of service to start + string service_name = 1; +} + +message StartResponse { + // Was the start successful + bool success = 1; + + // Optional port the service was started on + // This is likely to be None for bots + optional uint32 port = 2; +} diff --git a/runtimes/proto/src/lib.rs b/runtimes/proto/src/lib.rs new file mode 100644 index 000000000..70ee4b5b7 --- /dev/null +++ b/runtimes/proto/src/lib.rs @@ -0,0 +1,3 @@ +pub mod runtime { + tonic::include_proto!("runtime"); +}