diff --git a/Cargo.lock b/Cargo.lock index e556fcf2bc..7306d579a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5819,6 +5819,7 @@ name = "shuttle-proto" version = "0.7.0" dependencies = [ "prost", + "prost-types", "shuttle-common", "tonic", "tonic-build", @@ -5864,6 +5865,7 @@ dependencies = [ "shuttle-service", "thiserror", "tokio", + "tokio-stream", "tonic", "tracing", "tracing-subscriber", @@ -6824,9 +6826,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.9" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" dependencies = [ "futures-core", "pin-project-lite 0.2.9", diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 3b64c23d9f..bfe263cc43 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] prost = "0.11.0" +prost-types = "0.11.0" tonic = "0.8.2" [dependencies.shuttle-common] diff --git a/proto/runtime.proto b/proto/runtime.proto index d26e0c6c80..9cbd96220c 100644 --- a/proto/runtime.proto +++ b/proto/runtime.proto @@ -1,12 +1,16 @@ syntax = "proto3"; package runtime; +import "google/protobuf/timestamp.proto"; + service Runtime { // Load a service file to be ready to start it - rpc load(LoadRequest) returns (LoadResponse); + rpc Load(LoadRequest) returns (LoadResponse); // Start a loaded service file - rpc start(StartRequest) returns (StartResponse); + rpc Start(StartRequest) returns (StartResponse); + + rpc SubscribeLogs(SubscribeLogsRequest) returns (stream LogItem); } message LoadRequest { @@ -35,3 +39,36 @@ message StartResponse { // This is likely to be None for bots optional uint32 port = 2; } + +message SubscribeLogsRequest {} + +message LogItem { + string id = 1; + google.protobuf.Timestamp timestamp = 2; + LogState state = 3; + LogLevel level = 4; + optional string file = 5; + optional uint32 line = 6; + string target = 7; + bytes fields = 8; +} + +enum LogState { + Queued = 0; + Building = 1; + Built = 2; + Loading = 3; + Running = 4; + Completed = 5; + Stopped = 6; + Crashed = 7; + Unknown = 50; +} + +enum LogLevel { + Trace = 0; + Debug = 1; + Info = 2; + Warn = 3; + Error = 4; +} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 6dff3b13e6..d34230bf6e 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -65,5 +65,52 @@ pub mod provisioner { } pub mod runtime { + use std::time::SystemTime; + + use prost_types::Timestamp; + tonic::include_proto!("runtime"); + + impl From for LogItem { + fn from(log: shuttle_common::LogItem) -> Self { + Self { + id: log.id.to_string(), + timestamp: Some(Timestamp::from(SystemTime::from(log.timestamp))), + state: LogState::from(log.state) as i32, + level: LogLevel::from(log.level) as i32, + file: log.file, + line: log.line, + target: log.target, + fields: log.fields, + } + } + } + + impl From for LogState { + fn from(state: shuttle_common::deployment::State) -> Self { + match state { + shuttle_common::deployment::State::Queued => Self::Queued, + shuttle_common::deployment::State::Building => Self::Building, + shuttle_common::deployment::State::Built => Self::Built, + shuttle_common::deployment::State::Loading => Self::Loading, + shuttle_common::deployment::State::Running => Self::Running, + shuttle_common::deployment::State::Completed => Self::Completed, + shuttle_common::deployment::State::Stopped => Self::Stopped, + shuttle_common::deployment::State::Crashed => Self::Crashed, + shuttle_common::deployment::State::Unknown => Self::Unknown, + } + } + } + + impl From for LogLevel { + fn from(level: shuttle_common::log::Level) -> Self { + match level { + shuttle_common::log::Level::Trace => Self::Trace, + shuttle_common::log::Level::Debug => Self::Debug, + shuttle_common::log::Level::Info => Self::Info, + shuttle_common::log::Level::Warn => Self::Warn, + shuttle_common::log::Level::Error => Self::Error, + } + } + } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8dab41bc03..8c8f9f37a7 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -13,6 +13,7 @@ clap ={ version = "4.0.18", features = ["derive"] } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" tokio = { version = "=1.20.1", features = ["full"] } +tokio-stream = "0.1.11" tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } diff --git a/runtime/README.md b/runtime/README.md index 843951e75e..ac36e29686 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -8,8 +8,9 @@ $ DISCORD_TOKEN=xxx cargo run In another terminal: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:8000 runtime.Runtime/load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:8000 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs ``` ## shuttle-legacy @@ -30,8 +31,9 @@ cargo run -- --legacy --provisioner-address http://localhost:8000 Then in another shell, load a `.so` file and start it up: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs ``` ## Running the tests diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index 89a688f870..b4b1950d5b 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -1,5 +1,6 @@ use std::{ net::{Ipv4Addr, SocketAddr}, + ops::DerefMut, path::PathBuf, str::FromStr, sync::Mutex, @@ -10,13 +11,17 @@ use async_trait::async_trait; use shuttle_common::LogItem; use shuttle_proto::{ provisioner::provisioner_client::ProvisionerClient, - runtime::{runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse}, + runtime::{ + self, runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse, + SubscribeLogsRequest, + }, }; use shuttle_service::{ loader::{LoadedService, Loader}, Factory, Logger, ServiceName, }; use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Endpoint, Request, Response, Status}; use tracing::{info, instrument, trace}; @@ -28,6 +33,7 @@ pub struct Legacy { // Mutexes are for interior mutability so_path: Mutex>, port: Mutex>, + logs_rx: Mutex>>, provisioner_address: Endpoint, } @@ -36,6 +42,7 @@ impl Legacy { Self { so_path: Mutex::new(None), port: Mutex::new(None), + logs_rx: Mutex::new(None), provisioner_address, } } @@ -71,7 +78,8 @@ impl Runtime for Legacy { let mut factory = abstract_factory.get_factory(service_name); - let (logger, _rx) = get_logger(); + let (logger, rx) = get_logger(); + *self.logs_rx.lock().unwrap() = Some(rx); let so_path = self .so_path @@ -100,6 +108,29 @@ impl Runtime for Legacy { Ok(Response::new(message)) } + + type SubscribeLogsStream = ReceiverStream>; + + async fn subscribe_logs( + &self, + _request: Request, + ) -> Result, Status> { + let logs_rx = self.logs_rx.lock().unwrap().deref_mut().take(); + + if let Some(mut logs_rx) = logs_rx { + let (tx, rx) = mpsc::channel(1); + + tokio::spawn(async move { + while let Some(log) = logs_rx.recv().await { + tx.send(Ok(log.into())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } else { + Err(Status::internal("logs have already been subscribed to")) + } + } } #[instrument(skip(service))] diff --git a/runtime/src/next/mod.rs b/runtime/src/next/mod.rs index b6f15c6714..c14d6be78e 100644 --- a/runtime/src/next/mod.rs +++ b/runtime/src/next/mod.rs @@ -9,7 +9,10 @@ use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; use serenity::{model::prelude::*, prelude::*}; use shuttle_proto::runtime::runtime_server::Runtime; -use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; +use shuttle_proto::runtime::{ + self, LoadRequest, LoadResponse, StartRequest, StartResponse, SubscribeLogsRequest, +}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::trace; use wasi_common::file::FileCaps; @@ -68,6 +71,15 @@ impl Runtime for Next { Ok(Response::new(message)) } + + type SubscribeLogsStream = ReceiverStream>; + + async fn subscribe_logs( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } } struct BotBuilder {