From 6c766b23644c216c331b66a1784cffc831816525 Mon Sep 17 00:00:00 2001 From: stevelr Date: Mon, 14 Nov 2022 17:21:13 -0800 Subject: [PATCH] update to async-nats with jetstream objectstore and new headers api (#120) * update to async-nats with jetstream objectstore and new headers api Signed-off-by: stevelr * remove accidental '#' Signed-off-by: stevelr * update to async-nats 0.21.0 Signed-off-by: stevelr * synk warning - bring tailwind up to date Signed-off-by: stevelr * (WIP) async-nats 0.21-0.22 changes Signed-off-by: stevelr * add json tags to go codegen; update dependencies; clippy; bump codegen to 0.5.1 Signed-off-by: stevelr * misc clippy fixes Signed-off-by: stevelr * default config Signed-off-by: stevelr * bump to async-nats 0.22.0 Signed-off-by: stevelr * cleanup prometheus conditional, remove unused function Signed-off-by: stevelr * use logging in tests instead of println Signed-off-by: stevelr * tinygo merge Signed-off-by: stevelr * pr feedback Signed-off-by: stevelr * rustfmt Signed-off-by: stevelr * update codegen mssrv to 1.60 Signed-off-by: stevelr * misc cleanup; bump msrv to 1.64 due to async-nats requirement Signed-off-by: stevelr * bump macros msrv to 1.56.1 Signed-off-by: stevelr * update changelog Signed-off-by: stevelr * change warning message to recommend upgrade to host v0.59.0 Signed-off-by: stevelr * misc Signed-off-by: stevelr * bump to 0.6.0 Signed-off-by: stevelr * pr feedback; fix span fields Signed-off-by: stevelr Signed-off-by: stevelr --- codegen/Cargo.toml | 6 +- codegen/bin/dump-smithy-model.rs | 4 +- codegen/bin/gen.rs | 2 +- codegen/src/codegen_go.rs | 16 +++-- codegen/src/codegen_py.rs | 2 +- codegen/src/codegen_rust.rs | 1 + codegen/src/docgen.rs | 5 +- codegen/src/gen.rs | 16 ++--- codegen/src/loader.rs | 18 +++-- codegen/src/model.rs | 6 ++ codegen/src/render.rs | 8 +-- macros/Cargo.toml | 1 + rpc-rs/CHANGELOG.md | 8 +++ rpc-rs/Cargo.toml | 38 +++++------ rpc-rs/Makefile | 2 +- rpc-rs/examples/sub.rs | 6 +- rpc-rs/src/cbor.rs | 9 +-- rpc-rs/src/chunkify.rs | 109 +++++++++++++++--------------- rpc-rs/src/otel.rs | 34 +++++----- rpc-rs/src/provider.rs | 112 +++++++++++-------------------- rpc-rs/src/provider_main.rs | 34 +++++----- rpc-rs/src/rpc_client.rs | 110 +++++++++++++----------------- rpc-rs/src/timestamp.rs | 2 +- rpc-rs/tests/nats_sub.rs | 48 ++++++------- 24 files changed, 281 insertions(+), 316 deletions(-) diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 2c53736..af52009 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "weld-codegen" -version = "0.5.0" +version = "0.6.0" edition = "2021" authors = [ "wasmcloud Team" ] license = "Apache-2.0" @@ -9,7 +9,7 @@ homepage = "https://github.com/wasmcloud/weld" repository = "https://github.com/wasmcloud/weld" documentation = "https://docs.rs/weld-codegen" readme = "README.md" -rust-version = "1.57.0" +rust-version = "1.60.0" [features] default = [ "wasmbus" ] @@ -26,7 +26,7 @@ atelier_json = "0.2" atelier_smithy = "0.2" bytes = "1.0" cfg-if = "1.0" -clap = { version = "3.1", features = [ "derive" ] } +clap = { version = "4.0.22", features = [ "derive" ] } directories = "4.0" downloader = { version = "0.2", features = ["rustls-tls"], default-features = false } handlebars = "4.0" diff --git a/codegen/bin/dump-smithy-model.rs b/codegen/bin/dump-smithy-model.rs index 193c0cc..9bb9a74 100644 --- a/codegen/bin/dump-smithy-model.rs +++ b/codegen/bin/dump-smithy-model.rs @@ -6,10 +6,10 @@ use std::path::PathBuf; use weld_codegen::{config::CodegenConfig, sources_to_model}; #[derive(Parser, Debug)] -#[clap(author, version, about, long_about = None)] +#[command(author, version, about, long_about = None)] struct Args { /// codegen.toml file (default: "./codegen.toml") - #[clap(short, long)] + #[arg(short, long)] config: Option, } diff --git a/codegen/bin/gen.rs b/codegen/bin/gen.rs index 672700b..54c1d29 100644 --- a/codegen/bin/gen.rs +++ b/codegen/bin/gen.rs @@ -19,7 +19,7 @@ fn main() -> Result<()> { } fn load_config(path: &Path) -> Result { - let cfile = std::fs::read_to_string(&path) + let cfile = std::fs::read_to_string(path) .with_context(|| format!("reading config file at {}", &path.display()))?; let folder = path.parent().unwrap().to_path_buf(); let folder = std::fs::canonicalize(folder)?; diff --git a/codegen/src/codegen_go.rs b/codegen/src/codegen_go.rs index f0a31a1..a3d05a2 100644 --- a/codegen/src/codegen_go.rs +++ b/codegen/src/codegen_go.rs @@ -30,7 +30,8 @@ use crate::{ gen::CodeGen, model::{ get_operation, get_sorted_fields, get_trait, is_opt_namespace, value_to_json, - wasmcloud_core_namespace, wasmcloud_model_namespace, CommentKind, PackageName, Ty, + wasmcloud_actor_namespace, wasmcloud_core_namespace, wasmcloud_model_namespace, + CommentKind, PackageName, Ty, }, render::Renderer, writer::Writer, @@ -83,6 +84,7 @@ impl fmt::Display for DecodeRef { } } +#[derive(Default)] #[allow(dead_code)] pub struct GoCodeGen<'model> { /// if set, limits declaration output to this namespace only @@ -345,13 +347,15 @@ impl<'model> CodeGen for GoCodeGen<'model> { w, r#"package {} import ( - {} msgpack "github.com/wasmcloud/tinygo-msgpack" //nolint cbor "github.com/wasmcloud/tinygo-cbor" //nolint + {} )"#, &self.package, - if ns != wasmcloud_model_namespace() && ns != wasmcloud_core_namespace() { - "\"github.com/wasmcloud/actor-tinygo\" //nolint" + if ns == wasmcloud_actor_namespace() { + "core \"github.com/wasmcloud/interfaces/core/tinygo\" //nolint" + } else if ns != wasmcloud_model_namespace() && ns != wasmcloud_core_namespace() { + "actor \"github.com/wasmcloud/actor-tinygo\" //nolint" } else { // avoid circular dependencies - core and model are part of the actor-tinygo package "" @@ -784,9 +788,9 @@ impl<'model> GoCodeGen<'model> { let (fields, _is_numbered) = get_sorted_fields(ident, strukt)?; for member in fields.iter() { self.apply_documentation_traits(w, member.id(), member.traits()); - let (field_name, _ser_name) = self.get_field_name_and_ser_name(member)?; + let (field_name, ser_name) = self.get_field_name_and_ser_name(member)?; let target = member.target(); - let field_tags = ""; + let field_tags = format!(r#"`json:"{}"`"#, ser_name); writeln!( w, " {} {} {}", diff --git a/codegen/src/codegen_py.rs b/codegen/src/codegen_py.rs index 95d80f3..c74e5d6 100644 --- a/codegen/src/codegen_py.rs +++ b/codegen/src/codegen_py.rs @@ -115,7 +115,7 @@ impl<'model> CodeGen for PythonCodeGen<'model> { } fn source_formatter(&self, _: Vec) -> Result> { - Ok(Box::new(PythonSourceFormatter::default())) + Ok(Box::::default()) } /// Perform any initialization required prior to code generation for a file diff --git a/codegen/src/codegen_rust.rs b/codegen/src/codegen_rust.rs index 2079986..e6eff25 100644 --- a/codegen/src/codegen_rust.rs +++ b/codegen/src/codegen_rust.rs @@ -56,6 +56,7 @@ struct Declaration(u8, BytesMut); type ShapeList<'model> = Vec<(&'model ShapeID, &'model AppliedTraits, &'model ShapeKind)>; +#[derive(Default)] pub struct RustCodeGen<'model> { /// if set, limits declaration output to this namespace only pub(crate) namespace: Option, diff --git a/codegen/src/docgen.rs b/codegen/src/docgen.rs index df12a40..472bd4b 100644 --- a/codegen/src/docgen.rs +++ b/codegen/src/docgen.rs @@ -5,6 +5,7 @@ use std::{ use atelier_core::model::Model; +use crate::format::NullFormatter; use crate::{ config::{LanguageConfig, OutputFile, OutputLanguage}, error::Result, @@ -74,7 +75,7 @@ impl CodeGen for DocGen { .map(|id| id.to_string()) .collect::>(); - std::fs::create_dir_all(&output_dir).map_err(|e| { + std::fs::create_dir_all(output_dir).map_err(|e| { Error::Io(format!( "creating directory {}: {}", output_dir.display(), @@ -126,6 +127,6 @@ impl CodeGen for DocGen { name.into() } fn source_formatter(&self, _: Vec) -> Result> { - Ok(Box::new(crate::format::NullFormatter::default())) + Ok(Box::::default()) } } diff --git a/codegen/src/gen.rs b/codegen/src/gen.rs index 17cea8c..75333fb 100644 --- a/codegen/src/gen.rs +++ b/codegen/src/gen.rs @@ -14,12 +14,12 @@ use atelier_core::model::{ HasIdentity as _, Identifier, Model, }; +use crate::docgen::DocGen; use crate::{ codegen_go::GoCodeGen, codegen_py::PythonCodeGen, codegen_rust::RustCodeGen, config::{CodegenConfig, LanguageConfig, OutputFile, OutputLanguage}, - docgen::DocGen, error::{Error, Result}, format::{NullFormatter, SourceFormatter}, model::{get_trait, serialization_trait, CommentKind, NumberedMember}, @@ -226,11 +226,11 @@ fn gen_for_language<'model>( OutputLanguage::Python => Box::new(PythonCodeGen::new(model)), OutputLanguage::TinyGo => Box::new(GoCodeGen::new(model, true)), OutputLanguage::Go => Box::new(GoCodeGen::new(model, false)), - OutputLanguage::Html => Box::new(DocGen::default()), - OutputLanguage::Poly => Box::new(PolyGen::default()), + OutputLanguage::Html => Box::::default(), + OutputLanguage::Poly => Box::::default(), _ => { crate::error::print_warning(&format!("Target language {} not implemented", language)); - Box::new(NoCodeGen::default()) + Box::::default() } } } @@ -494,8 +494,8 @@ fn ensure_files_exist(source_files: &[std::path::PathBuf]) -> Result<()> { } #[derive(Debug, Default)] -struct PolyGen {} -impl CodeGen for PolyGen { +struct PolyCodeGen {} +impl CodeGen for PolyCodeGen { fn output_language(&self) -> OutputLanguage { OutputLanguage::Poly } @@ -515,7 +515,7 @@ impl CodeGen for PolyGen { } fn source_formatter(&self, _: Vec) -> Result> { - Ok(Box::new(NullFormatter::default())) + Ok(Box::::default()) } } @@ -625,6 +625,6 @@ impl CodeGen for NoCodeGen { } fn source_formatter(&self, _: Vec) -> Result> { - Ok(Box::new(NullFormatter::default())) + Ok(Box::::default()) } } diff --git a/codegen/src/loader.rs b/codegen/src/loader.rs index 6bb3687..6607241 100644 --- a/codegen/src/loader.rs +++ b/codegen/src/loader.rs @@ -230,16 +230,14 @@ fn urls_to_cached_files(urls: Vec) -> Result> { Error::Other(format!("internal download error {}", e)) })?; let cache_file = weld_cache.join(rel_path); - std::fs::create_dir_all(&cache_file.parent().unwrap()).map_err( - |e| { - Error::Io(format!( - "creating folder {}: {}", - &cache_file.parent().unwrap().display(), - e - )) - }, - )?; - std::fs::copy(&downloaded_file, &cache_file).map_err(|e| { + std::fs::create_dir_all(cache_file.parent().unwrap()).map_err(|e| { + Error::Io(format!( + "creating folder {}: {}", + &cache_file.parent().unwrap().display(), + e + )) + })?; + std::fs::copy(downloaded_file, &cache_file).map_err(|e| { Error::Other(format!( "writing cache file {}: {}", &cache_file.display(), diff --git a/codegen/src/model.rs b/codegen/src/model.rs index 1ee54da..86b2168 100644 --- a/codegen/src/model.rs +++ b/codegen/src/model.rs @@ -23,6 +23,7 @@ use crate::{ const WASMCLOUD_MODEL_NAMESPACE: &str = "org.wasmcloud.model"; const WASMCLOUD_CORE_NAMESPACE: &str = "org.wasmcloud.core"; +const WASMCLOUD_ACTOR_NAMESPACE: &str = "org.wasmcloud.actor"; const TRAIT_CODEGEN_RUST: &str = "codegenRust"; // If any of these are needed, they would have to be defined in core namespace @@ -41,6 +42,8 @@ lazy_static! { NamespaceID::new_unchecked(WASMCLOUD_MODEL_NAMESPACE); static ref WASMCLOUD_CORE_NAMESPACE_ID: NamespaceID = NamespaceID::new_unchecked(WASMCLOUD_CORE_NAMESPACE); + static ref WASMCLOUD_ACTOR_NAMESPACE_ID: NamespaceID = + NamespaceID::new_unchecked(WASMCLOUD_ACTOR_NAMESPACE); static ref SERIALIZATION_TRAIT_ID: ShapeID = ShapeID::new( NamespaceID::new_unchecked(WASMCLOUD_MODEL_NAMESPACE), Identifier::from_str(TRAIT_SERIALIZATION).unwrap(), @@ -81,6 +84,9 @@ pub fn wasmcloud_model_namespace() -> &'static NamespaceID { pub fn wasmcloud_core_namespace() -> &'static NamespaceID { &WASMCLOUD_CORE_NAMESPACE_ID } +pub fn wasmcloud_actor_namespace() -> &'static NamespaceID { + &WASMCLOUD_ACTOR_NAMESPACE_ID +} #[cfg(feature = "wasmbus")] /// shape id of trait @wasmbus diff --git a/codegen/src/render.rs b/codegen/src/render.rs index 7898a52..b82578a 100644 --- a/codegen/src/render.rs +++ b/codegen/src/render.rs @@ -249,7 +249,7 @@ impl HelperDef for NamespaceHelper { ) -> Result, RenderError> { let namespace = arg_as_string(h, 0, "filter_namespace")?; let namespace = NamespaceID::from_str(namespace) - .map_err(|e| RenderError::new(&format!("invalid namespace {}", e)))?; + .map_err(|e| RenderError::new(format!("invalid namespace {}", e)))?; let obj = arg_as_obj(h, 1, "filter_namespace")?; let shapes = obj @@ -466,7 +466,7 @@ fn add_base_helpers(hb: &mut Handlebars) { // get first arg as string let id = arg_as_string(h, 0, "namespace")?; let id = ShapeID::from_str(id).map_err(|e| { - RenderError::new(&format!("invalid shape id {} for namespace_name", e)) + RenderError::new(format!("invalid shape id {} for namespace_name", e)) })?; out.write(&id.namespace().to_string())?; Ok(()) @@ -530,7 +530,7 @@ fn add_base_helpers(hb: &mut Handlebars) { -> HelperResult { let id = arg_as_string(h, 0, "shape_name")?; let id = ShapeID::from_str(id).map_err(|e| { - RenderError::new(&format!("invalid shape id {} for shape_name", e)) + RenderError::new(format!("invalid shape id {} for shape_name", e)) })?; out.write(&id.shape_name().to_string())?; Ok(()) @@ -552,7 +552,7 @@ fn add_base_helpers(hb: &mut Handlebars) { -> HelperResult { let id = arg_as_string(h, 0, "member_name")?; let id = Identifier::from_str(id).map_err(|e| { - RenderError::new(&format!("invalid member id {} for member_name", e)) + RenderError::new(format!("invalid member id {} for member_name", e)) })?; out.write(&id.to_string())?; Ok(()) diff --git a/macros/Cargo.toml b/macros/Cargo.toml index e93ce6c..a28a33b 100644 --- a/macros/Cargo.toml +++ b/macros/Cargo.toml @@ -9,6 +9,7 @@ homepage = "https://github.com/wasmcloud/weld" repository = "https://github.com/wasmcloud/weld" documentation = "https://docs.rs/wasmcloud-weld-macros" readme = "README.md" +rust-version = "1.56.1" [lib] proc-macro = true diff --git a/rpc-rs/CHANGELOG.md b/rpc-rs/CHANGELOG.md index 0c9ea32..2141a2f 100644 --- a/rpc-rs/CHANGELOG.md +++ b/rpc-rs/CHANGELOG.md @@ -1,5 +1,13 @@ # wasmbus-rpc Changelog +## 0.11.0-alpha.1 + +- upgrade to async-nats 0.22.0 +- additional logging on reconnect +- add json tags to generated tinygo struct declarations +- bumped chunkify threshold to 900KiB \(900\*1024\) to match host threshold + + ## 0.10.0 - upgrade to async-nats 0.18.0 diff --git a/rpc-rs/Cargo.toml b/rpc-rs/Cargo.toml index 1ebf704..7bdeed0 100644 --- a/rpc-rs/Cargo.toml +++ b/rpc-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wasmbus-rpc" -version = "0.11.0" +version = "0.11.0-alpha.1" authors = [ "wasmcloud Team" ] license = "Apache-2.0" description = "Runtime library for actors and capability providers" @@ -10,7 +10,7 @@ documentation = "https://docs.rs/wasmbus-rpc" readme = "README.md" edition = "2021" # MSRV -rust-version = "1.58.1" +rust-version = "1.64" # don't push build.rs exclude = [ "build.rs" ] @@ -23,7 +23,6 @@ metrics = [ "prometheus" ] otel = ["opentelemetry", "tracing-opentelemetry", "opentelemetry-otlp"] [dependencies] -anyhow = "1.0.57" async-trait = "0.1" base64 = "0.13" bytes = "1.1.0" @@ -34,8 +33,7 @@ serde_bytes = "0.11" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" -time = "0.3.13" -tokio-timer = "0.2" +time = "0.3.16" toml = "0.5" tracing = { version = "0.1.34", features = ["log"] } tracing-futures = "0.2" @@ -48,28 +46,28 @@ num-bigint = { version = "0.4", optional = true } bigdecimal = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { version = "1", features = ["full"] } +async-nats = "0.22.0" +atty = "0.2" +data-encoding = "2.3" futures = "0.3" -async-nats = "0.19.0" -nats = "0.23.0" +lazy_static = "1.4" nkeys = "0.2" once_cell = "1.8" -uuid = { version = "1.0", features = ["v4", "serde"] } -wascap = "0.8.0" +opentelemetry = { version = "0.18", features = ["rt-tokio"], optional = true } +opentelemetry-otlp = { version = "0.11", features = ["http-proto", "reqwest-client"], optional = true } +prometheus = { version = "0.13", optional = true } sha2 = "0.10.2" -data-encoding = "2.3" +tokio = { version = "1", features = ["full"] } +tracing-opentelemetry = { version = "0.18", optional = true } tracing-subscriber = { version = "0.3.7", features = ["env-filter", "json"] } -atty = "0.2" -opentelemetry = { version = "0.17", features = ["rt-tokio"], optional = true } -tracing-opentelemetry = { version = "0.17", optional = true } -lazy_static = "1.4" -opentelemetry-otlp = { version = "0.10", features = ["http-proto", "reqwest-client"], optional = true } - -prometheus = { version = "0.13", optional = true } +uuid = { version = "1.0", features = ["v4", "serde"] } +wascap = "0.8.0" [dev-dependencies] +anyhow = "1.0.66" regex = "1" -clap = { version = "4.0.20", features = ["derive"] } +clap = { version = "4.0.22", features = ["derive"] } +test-log = { version = "0.2.10", default-features = false, features = ["trace"] } [build-dependencies] -weld-codegen = { version = "0.5.0", path = "../codegen" } +weld-codegen = { version = "0.6.0", path = "../codegen" } diff --git a/rpc-rs/Makefile b/rpc-rs/Makefile index e18bf38..1017ad4 100644 --- a/rpc-rs/Makefile +++ b/rpc-rs/Makefile @@ -21,7 +21,7 @@ test:: docker stop wasmbus-rpc-test else test:: - cargo test -- --nocapture + WASMBUS_RPC_TIMEOUT_MS=4000 cargo test -- --nocapture cargo clippy --all-features --all-targets rustfmt --edition 2021 --check src/*.rs endif diff --git a/rpc-rs/examples/sub.rs b/rpc-rs/examples/sub.rs index 30a132a..453332a 100644 --- a/rpc-rs/examples/sub.rs +++ b/rpc-rs/examples/sub.rs @@ -8,14 +8,14 @@ use wasmbus_rpc::rpc_client::RpcClient; /// RpcClient test CLI for connection and subscription #[derive(Parser)] -#[clap(version, about, long_about = None)] +#[command(version, about, long_about = None)] struct Args { /// Nats uri. Defaults to 'nats://127.0.0.1:4222' - #[clap(short, long)] + #[arg(short, long)] nats: Option, /// Subject (topic) - #[clap(value_parser)] + #[arg(value_parser)] subject: String, } diff --git a/rpc-rs/src/cbor.rs b/rpc-rs/src/cbor.rs index 5669891..0c0d3c6 100644 --- a/rpc-rs/src/cbor.rs +++ b/rpc-rs/src/cbor.rs @@ -192,13 +192,10 @@ pub struct Encoder { } pub fn vec_encoder(header: bool) -> Encoder> { - let buf = if header { - let mut buf = Vec::new(); + let mut buf = Vec::new(); + if header { crate::common::MessageFormat::Cbor.write_header(&mut buf).unwrap(); - buf - } else { - Vec::new() - }; + } Encoder { inner: minicbor::Encoder::new(buf) } } diff --git a/rpc-rs/src/chunkify.rs b/rpc-rs/src/chunkify.rs index ffb399b..49d782f 100644 --- a/rpc-rs/src/chunkify.rs +++ b/rpc-rs/src/chunkify.rs @@ -19,26 +19,25 @@ use std::{ collections::HashMap, - io::Read, + marker::Unpin, sync::{Arc, RwLock}, }; -use nats::{ - jetstream::JetStream, +use async_nats::jetstream::{ + self, object_store::{Config, ObjectStore}, - JetStreamOptions, + Context, }; +use futures::TryFutureExt; use once_cell::sync::OnceCell; +use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::{debug, error, instrument}; -use crate::{ - error::{RpcError, RpcResult}, - provider_main::get_host_bridge, -}; +use crate::error::{RpcError, RpcResult}; /// Maximum size of a message payload before it will be chunked /// Nats currently uses 128kb chunk size so this should be at least 128KB -const CHUNK_THRESHOLD_BYTES: usize = 1024 * 700; // 700KB +const CHUNK_THRESHOLD_BYTES: usize = 1024 * 900; // 900KB /// check if message payload needs to be chunked pub(crate) fn needs_chunking(payload_size: usize) -> bool { @@ -46,7 +45,7 @@ pub(crate) fn needs_chunking(payload_size: usize) -> bool { } /// map from lattice to ObjectStore - includes nats client connection -type JsMap = HashMap; +type JsMap = HashMap; fn jetstream_map() -> Arc> { static INSTANCE: OnceCell>> = OnceCell::new(); @@ -64,21 +63,40 @@ pub(crate) fn shutdown() { #[derive(Clone)] pub struct ChunkEndpoint { lattice: String, - js: JetStream, + js: Context, } impl ChunkEndpoint { - pub fn new(lattice: String, js: JetStream) -> Self { + pub fn new(lattice: String, js: Context) -> Self { ChunkEndpoint { lattice, js } } + pub(crate) fn with_client( + lattice: String, + nc: async_nats::Client, + domain: Option, + ) -> Self { + let map = jetstream_map(); + let mut _w = map.write().unwrap(); // panics if lock is poisoned + let js = _w.get(&lattice).cloned().unwrap_or_else(|| { + let js = if let Some(domain) = domain { + jetstream::with_domain(nc, domain) + } else { + jetstream::new(nc) + }; + _w.insert(lattice.clone(), js.clone()); + js + }); + ChunkEndpoint::new(lattice, js) + } + /// load the message after de-chunking #[instrument(level = "trace", skip(self))] - pub fn get_unchunkified(&self, inv_id: &str) -> RpcResult> { + pub async fn get_unchunkified(&self, inv_id: &str) -> RpcResult> { let mut result = Vec::new(); - let store = self.create_or_reuse_store()?; + let store = self.create_or_reuse_store().await?; debug!(invocation_id = %inv_id, "chunkify starting to receive"); - let mut obj = store.get(inv_id).map_err(|e| { + let mut obj = store.get(inv_id).await.map_err(|e| { RpcError::Nats(format!( "error starting to receive chunked stream for inv {}:{}", inv_id, e @@ -90,8 +108,9 @@ impl ChunkEndpoint { "error receiving chunked stream for inv {}:{}", inv_id, e )) - })?; - if let Err(e) = store.delete(inv_id) { + }) + .await?; + if let Err(e) = store.delete(inv_id).await { // not deleting will be a non-fatal error for the receiver, // if all the bytes have been received error!(invocation_id = %inv_id, error = %e, "deleting chunks for inv"); @@ -100,18 +119,23 @@ impl ChunkEndpoint { } /// load response after de-chunking - pub fn get_unchunkified_response(&self, inv_id: &str) -> RpcResult> { + pub async fn get_unchunkified_response(&self, inv_id: &str) -> RpcResult> { // responses are stored in the object store with '-r' suffix on the object name - self.get_unchunkified(&format!("{}-r", inv_id)) + self.get_unchunkified(&format!("{}-r", inv_id)).await } /// chunkify a message #[instrument(level = "trace", skip(self, bytes))] - pub fn chunkify(&self, inv_id: &str, bytes: &mut impl Read) -> RpcResult<()> { - let store = self.create_or_reuse_store()?; + pub async fn chunkify( + &self, + inv_id: &str, + mut bytes: (impl AsyncRead + Unpin), + ) -> RpcResult<()> { + let store = self.create_or_reuse_store().await?; debug!(invocation_id = %inv_id, "chunkify starting to send"); let info = store - .put(inv_id, bytes) + .put(inv_id, &mut bytes) + .await .map_err(|e| RpcError::Nats(format!("writing chunkified for {}: {}", inv_id, e)))?; debug!(?info, invocation_id = %inv_id, "chunkify completed writing"); @@ -119,47 +143,26 @@ impl ChunkEndpoint { } /// chunkify a portion of a response - pub fn chunkify_response(&self, inv_id: &str, bytes: &mut impl Read) -> Result<(), RpcError> { - self.chunkify(&format!("{}-r", inv_id), bytes) + pub async fn chunkify_response( + &self, + inv_id: &str, + bytes: &mut (impl AsyncRead + Unpin), + ) -> Result<(), RpcError> { + self.chunkify(&format!("{}-r", inv_id), bytes).await } - fn create_or_reuse_store(&self) -> RpcResult { - let store = match self.js.object_store(&self.lattice) { + async fn create_or_reuse_store(&self) -> RpcResult { + let store = match self.js.get_object_store(&self.lattice).await { Ok(store) => store, Err(_) => self .js - .create_object_store(&Config { + .create_object_store(Config { bucket: self.lattice.clone(), ..Default::default() }) + .await .map_err(|e| RpcError::Nats(format!("Failed to create store: {}", &e)))?, }; Ok(store) } } - -pub(crate) fn chunkify_endpoint( - domain: Option, - lattice: String, -) -> RpcResult { - let js = connect_js(domain, &lattice)?; - Ok(ChunkEndpoint::new(lattice, js)) -} - -pub(crate) fn connect_js(domain: Option, lattice_prefix: &str) -> RpcResult { - let map = jetstream_map(); - let mut _w = map.write().unwrap(); // panics if lock is poisioned - let js: JetStream = if let Some(js) = _w.get(lattice_prefix) { - js.clone() - } else { - let nc = get_host_bridge().new_sync_client()?; - let mut jsoptions = JetStreamOptions::new(); - if let Some(domain) = domain { - jsoptions = jsoptions.domain(domain.as_str()); - } - let js = JetStream::new(nc, jsoptions); - _w.insert(lattice_prefix.to_string(), js.clone()); - js - }; - Ok(js) -} diff --git a/rpc-rs/src/otel.rs b/rpc-rs/src/otel.rs index c81fe57..0e10635 100644 --- a/rpc-rs/src/otel.rs +++ b/rpc-rs/src/otel.rs @@ -18,29 +18,41 @@ lazy_static::lazy_static! { #[derive(Debug)] pub struct OtelHeaderExtractor<'a> { inner: &'a HeaderMap, + keys: Vec, } impl<'a> OtelHeaderExtractor<'a> { /// Creates a new extractor using the given [`HeaderMap`] pub fn new(headers: &'a HeaderMap) -> Self { - OtelHeaderExtractor { inner: headers } + OtelHeaderExtractor { + inner: headers, + keys: headers + .iter() + .map(|(k, _)| String::from_utf8_lossy(k.as_ref()).to_string()) + .collect(), + } } /// Creates a new extractor using the given message pub fn new_from_message(msg: &'a async_nats::Message) -> Self { + let inner = msg.headers.as_ref().unwrap_or(&EMPTY_HEADERS); OtelHeaderExtractor { - inner: msg.headers.as_ref().unwrap_or(&EMPTY_HEADERS), + inner, + keys: inner + .iter() + .map(|(k, _)| String::from_utf8_lossy(k.as_ref()).to_string()) + .collect(), } } } impl<'a> Extractor for OtelHeaderExtractor<'a> { fn get(&self, key: &str) -> Option<&str> { - self.inner.get(key).and_then(|s| s.to_str().ok()) + self.inner.get(key).and_then(|s| s.iter().next().map(|s| s.as_str())) } fn keys(&self) -> Vec<&str> { - self.inner.keys().map(|s| s.as_str()).collect() + self.keys.iter().map(|k| k.as_str()).collect() } } @@ -87,19 +99,7 @@ impl OtelHeaderInjector { impl Injector for OtelHeaderInjector { fn set(&mut self, key: &str, value: String) { - // NOTE: Because the underlying headers are an http header, we are going to escape any - // unicode values and non-printable ASCII chars, which sounds better than just silently - // ignoring or using an empty string. Unfortunately this adds an extra allocation that is - // probably ok for now as it is freed at the end, but I prefer telemetry stuff to be as - // little overhead as possible. If anyone has a better idea of how to handle this, please PR - // it in - let header_name = key.escape_default().to_string().into_bytes(); - let escaped = value.escape_default().to_string().into_bytes(); - // SAFETY: All chars escaped above - self.inner.insert( - async_nats::header::HeaderName::from_bytes(&header_name).unwrap(), - async_nats::HeaderValue::from_bytes(&escaped).unwrap(), - ); + self.inner.insert(key, value.as_ref()); } } diff --git a/rpc-rs/src/provider.rs b/rpc-rs/src/provider.rs index 195c07a..336918f 100644 --- a/rpc-rs/src/provider.rs +++ b/rpc-rs/src/provider.rs @@ -151,33 +151,6 @@ pub struct HostBridge { } impl HostBridge { - #[cfg(not(target_arch = "wasm32"))] - pub(crate) fn new_sync_client(&self) -> RpcResult { - let nats_addr = if !self.host_data.lattice_rpc_url.is_empty() { - self.host_data.lattice_rpc_url.as_str() - } else { - DEFAULT_NATS_ADDR - }; - let nats_opts = match ( - self.host_data.lattice_rpc_user_jwt.trim(), - self.host_data.lattice_rpc_user_seed.trim(), - ) { - ("", "") => nats::Options::default(), - (rpc_jwt, rpc_seed) => { - let kp = nkeys::KeyPair::from_seed(rpc_seed).unwrap(); - let jwt = rpc_jwt.to_owned(); - nats::Options::with_jwt( - move || Ok(jwt.to_owned()), - move |nonce| kp.sign(nonce).unwrap(), - ) - } - }; - // Connect to nats - nats_opts.max_reconnects(None).connect(nats_addr).map_err(|e| { - RpcError::ProviderInit(format!("nats connection to {} failed: {}", nats_addr, e)) - }) - } - pub(crate) fn new_client( nats: async_nats::Client, host_data: &HostData, @@ -192,7 +165,7 @@ impl HostBridge { let rpc_client = RpcClient::new_client( nats, host_data.host_id.clone(), - host_data.default_rpc_timeout_ms.map(|ms| Duration::from_millis(ms as u64)), + host_data.default_rpc_timeout_ms.map(Duration::from_millis), key.clone(), ); @@ -221,6 +194,11 @@ impl HostBridge { pub fn link_name(&self) -> &str { self.host_data.link_name.as_str() } + + /// returns the lattice id + pub fn lattice_prefix(&self) -> &str { + &self.host_data.lattice_rpc_prefix + } } impl Deref for HostBridge { @@ -264,7 +242,7 @@ impl std::fmt::Debug for HostBridge { impl HostBridge { /// Returns a reference to the rpc client - fn rpc_client(&self) -> &RpcClient { + pub(crate) fn rpc_client(&self) -> &RpcClient { &self.rpc_client } @@ -379,10 +357,7 @@ impl HostBridge { break; }, nats_msg = sub.next() => { - let msg = match nats_msg { - None => break, - Some(msg) => msg - }; + let msg = if let Some(msg) = nats_msg { msg } else { break; }; let this = this.clone(); let provider = provider.clone(); let lattice = lattice.clone(); @@ -402,7 +377,6 @@ impl HostBridge { crate::otel::attach_span_context(&msg); match crate::common::deserialize::(&msg.payload) { Ok(inv) => { - let inv_id = inv.id.clone(); let current = tracing::Span::current(); current.record("operation", &tracing::field::display(&inv.operation)); current.record("lattice_id", &tracing::field::display(&lattice)); @@ -413,13 +387,19 @@ impl HostBridge { current.record("contract_id", &tracing::field::display(&inv.target.contract_id)); current.record("link_name", &tracing::field::display(&inv.target.link_name)); current.record("payload_size", &tracing::field::display(&inv.content_length.unwrap_or_default())); - let provider = provider.clone(); - let resp = match this.handle_rpc(provider, inv).in_current_span().await { + #[cfg(feature = "prometheus")] + { + if let Some(len) = inv.content_length { + this.rpc_client.stats.rpc_recv_bytes.inc_by(len); + } + this.rpc_client.stats.rpc_recv.inc(); + } + let inv_id = inv.id.clone(); + let resp = match this.handle_rpc(provider.clone(), inv).in_current_span().await { Err(error) => { - error!( - %error, - "Invocation failed" - ); + error!(%error, "Invocation failed"); + #[cfg(feature = "prometheus")] + this.rpc_client.stats.rpc_recv_err.inc(); InvocationResponse{ invocation_id: inv_id, error: Some(error.to_string()), @@ -427,6 +407,8 @@ impl HostBridge { } }, Ok(bytes) => { + #[cfg(feature = "prometheus")] + this.rpc_client.stats.rpc_recv_resp_bytes.inc_by(bytes.len() as u64); InvocationResponse{ invocation_id: inv_id, content_length: Some(bytes.len() as u64), @@ -438,7 +420,7 @@ impl HostBridge { if let Some(reply) = msg.reply { // send reply if let Err(error) = this.rpc_client() - .publish_invocation_response(reply, resp, &lattice).in_current_span().await { + .publish_invocation_response(reply, resp, &lattice).in_current_span().await { error!(%error, "rpc sending response"); } } @@ -453,7 +435,7 @@ impl HostBridge { }, &lattice ).in_current_span().await { - error!(error = %e, "unable to publish error message to invocation response"); + error!(error = %e, "unable to publish invocation response error"); } } } @@ -481,8 +463,7 @@ impl HostBridge { let inv = self.rpc_client().dechunk(inv, lattice).await?; let (inv, claims) = self.rpc_client.validate_invocation(inv).await?; self.validate_provider_invocation(&inv, &claims).await?; - - let rc = provider + provider .dispatch( &Context { actor: Some(inv.origin.public_key.clone()), @@ -494,18 +475,7 @@ impl HostBridge { }, ) .instrument(tracing::debug_span!("dispatch", public_key = %inv.origin.public_key, operation = %inv.operation)) - .await; - - #[cfg(feature = "prometheus")] - match &rc { - Err(_) => { - self.rpc_client.stats.rpc_recv_err.inc(); - } - Ok(vec) => { - self.rpc_client.stats.rpc_recv_resp_bytes.inc_by(vec.len() as u64); - } - } - rc + .await } async fn subscribe_shutdown

( @@ -540,16 +510,19 @@ impl HostBridge { let shutmsg: ShutdownMessage = serde_json::from_slice(&payload).unwrap_or_default(); // Backwards compatibility - if no host (or payload) is supplied, default // to shutting down unconditionally + if shutmsg.host_id.is_empty() { + warn!("Please upgrade your wasmcloud host to >= 0.59.0 if you use lattices with multiple hosts.") + } if shutmsg.host_id == self.host_data.host_id || shutmsg.host_id.is_empty() { info!("Received termination signal and stopping"); // Tell provider to shutdown - before we shut down nats subscriptions, // in case it needs to do any message passing during shutdown - if let Err(e) = provider.shutdown().await { - error!(error = %e, "got error during provider shutdown processing"); + if let Err(error) = provider.shutdown().await { + error!(%error, "got error during provider shutdown processing"); } let data = b"shutting down".to_vec(); - if let Err(e) = self.rpc_client().publish(reply_to, data).await { - error!(error = %e, "failed to send shutdown ack"); + if let Err(error) = self.rpc_client().publish(reply_to, data).await { + warn!(%error, "failed to send shutdown ack"); } // unsubscribe from shutdown topic let _ = sub.unsubscribe().await; @@ -666,16 +639,13 @@ impl HostBridge { let this = self.clone(); process_until_quit!(sub, quit, msg, { let arg = HealthCheckRequest {}; - let resp = match provider.health_request(&arg).await { - Ok(resp) => resp, - Err(e) => { - error!(error = %e, "error generating health check response"); - HealthCheckResponse { - healthy: false, - message: Some(e.to_string()), - } + let resp = provider.health_request(&arg).await.unwrap_or_else(|e| { + error!(error = %e, "error generating health check response"); + HealthCheckResponse { + healthy: false, + message: Some(e.to_string()), } - }; + }); let buf = if this.host_data.is_test() { Ok(serde_json::to_vec(&resp).unwrap()) } else { @@ -752,7 +722,7 @@ impl<'send> ProviderTransport<'send> { bridge .host_data .default_rpc_timeout_ms - .map(|t| Duration::from_millis(t as u64)) + .map(Duration::from_millis) .unwrap_or(DEFAULT_RPC_TIMEOUT_MILLIS) })); Self { bridge, ld, timeout } @@ -778,7 +748,7 @@ impl<'send> Transport for ProviderTransport<'send> { self.bridge .host_data .default_rpc_timeout_ms - .map(|t| Duration::from_millis(t as u64)) + .map(Duration::from_millis) .unwrap_or(DEFAULT_RPC_TIMEOUT_MILLIS) } }; diff --git a/rpc-rs/src/provider_main.rs b/rpc-rs/src/provider_main.rs index f4c2bd9..6f07dbb 100644 --- a/rpc-rs/src/provider_main.rs +++ b/rpc-rs/src/provider_main.rs @@ -6,7 +6,7 @@ use std::str::FromStr; use once_cell::sync::OnceCell; #[cfg(feature = "otel")] use opentelemetry::sdk::{ - trace::{self, IdGenerator, Sampler}, + trace::{self, RandomIdGenerator, Sampler}, Resource, }; #[cfg(feature = "otel")] @@ -68,14 +68,17 @@ static BRIDGE: OnceCell = OnceCell::new(); // this may be called any time after initialization pub fn get_host_bridge() -> &'static HostBridge { - match BRIDGE.get() { - Some(b) => b, - None => { - // initialized first thing, so this shouldn't happen - eprintln!("BRIDGE not initialized"); - panic!(); - } - } + BRIDGE.get().unwrap_or_else(|| { + // initialized first thing, so this shouldn't happen + eprintln!("BRIDGE not initialized"); + panic!(); + }) +} + +// like get_host_bridge but doesn't panic if it's not initialized +// This could be a valid condition if RpcClient is used outside capability providers +pub fn get_host_bridge_safe() -> Option<&'static HostBridge> { + BRIDGE.get() } #[doc(hidden)] @@ -325,7 +328,7 @@ fn configure_tracing(provider_name: String, structured_logging_enabled: bool) { .with_trace_config( trace::config() .with_sampler(Sampler::AlwaysOn) - .with_id_generator(IdGenerator::default()) + .with_id_generator(RandomIdGenerator::default()) .with_max_events_per_span(64) .with_max_attributes_per_span(16) .with_max_events_per_span(16) @@ -370,11 +373,8 @@ fn get_log_layer(structured_logging_enabled: bool) -> impl Layer EnvFilter { - match EnvFilter::try_from_default_env() { - Ok(f) => f, - Err(e) => { - eprintln!("RUST_LOG was not set or the given directive was invalid: {:?}\nDefaulting logger to `info` level", e); - EnvFilter::default().add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) - } - } + EnvFilter::try_from_default_env().unwrap_or_else(|e| { + eprintln!("RUST_LOG was not set or the given directive was invalid: {:?}\nDefaulting logger to `info` level", e); + EnvFilter::default().add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + }) } diff --git a/rpc-rs/src/rpc_client.rs b/rpc-rs/src/rpc_client.rs index 2dea0ad..2a6dbcc 100644 --- a/rpc-rs/src/rpc_client.rs +++ b/rpc-rs/src/rpc_client.rs @@ -18,13 +18,13 @@ use tracing::{debug, error, info, instrument, trace, warn}; #[cfg(feature = "otel")] use crate::otel::OtelHeaderInjector; - -use crate::wascap::{jwt, prelude::Claims}; use crate::{ - chunkify, + chunkify::{needs_chunking, ChunkEndpoint}, common::Message, core::{Invocation, InvocationResponse, WasmCloudEntity}, error::{RpcError, RpcResult}, + provider_main::get_host_bridge_safe, + wascap::{jwt, prelude::Claims}, }; pub(crate) const DEFAULT_RPC_TIMEOUT_MILLIS: Duration = Duration::from_millis(2000); @@ -256,7 +256,7 @@ impl RpcClient { } /// request or publish an rpc invocation - #[instrument(level = "debug", skip(self, origin, target, message), fields(issuer = tracing::field::Empty, origin_url = tracing::field::Empty, inv_id = tracing::field::Empty, target_url = tracing::field::Empty, method = tracing::field::Empty, provider_id = tracing::field::Empty))] + #[instrument(level = "debug", skip(self, origin, target, message), fields( provider_id = tracing::field::Empty, method = tracing::field::Empty, lattice_id = tracing::field::Empty, subject = tracing::field::Empty, issuer = tracing::field::Empty, sender_key = tracing::field::Empty, contract_id = tracing::field::Empty, link_name = tracing::field::Empty, target_key = tracing::field::Empty ))] async fn inner_rpc( &self, origin: WasmCloudEntity, @@ -279,13 +279,25 @@ impl RpcClient { // Record all of the fields on the span. To avoid extra allocations, we are only going to // record here after we generate/derive the values let span = tracing::span::Span::current(); - span.record("provider_id", &tracing::field::display(&issuer)); + if let Some(hb) = get_host_bridge_safe() { + span.record("provider_id", &tracing::field::display(&hb.provider_key())); + } span.record("method", &tracing::field::display(&message.method)); span.record("lattice_id", &tracing::field::display(&lattice)); - span.record("target_id", &tracing::field::display(&target.public_key)); span.record("subject", &tracing::field::display(&subject)); span.record("issuer", &tracing::field::display(&issuer)); - + if !origin.public_key.is_empty() { + span.record("sender_key", &tracing::field::display(&origin.public_key)); + } + if !target.contract_id.is_empty() { + span.record("contract_id", &tracing::field::display(&target.contract_id)); + } + if !target.link_name.is_empty() { + span.record("link_name", &tracing::field::display(&target.link_name)); + } + if !target.public_key.is_empty() { + span.record("target_key", &tracing::field::display(&target.public_key)); + } //debug!("rpc_client sending"); let claims = Claims::::new( issuer.clone(), @@ -298,7 +310,7 @@ impl RpcClient { let topic = rpc_topic(&target, lattice); let method = message.method.to_string(); let len = message.arg.len(); - let chunkify = chunkify::needs_chunking(len); + let chunkify = needs_chunking(len); let (invocation, body) = { let mut inv = Invocation { @@ -324,12 +336,9 @@ impl RpcClient { debug!(invocation_id = %inv_id, %len, "chunkifying invocation"); // start chunking thread let lattice = lattice.to_string(); - if let Err(error) = tokio::task::spawn_blocking(move || { - let ce = chunkify::chunkify_endpoint(None, lattice)?; - ce.chunkify(&inv_id, &mut body.as_slice()) - }) - .await - .map_err(|join_e| RpcError::Other(join_e.to_string()))? + if let Err(error) = ChunkEndpoint::with_client(lattice, self.client(), None) + .chunkify(&inv_id, &mut body.as_slice()) + .await { error!(%error, "chunking error"); return Err(RpcError::Other(error.to_string())); @@ -387,10 +396,8 @@ impl RpcClient { match inv_response.error { None => { #[cfg(feature = "prometheus")] - { - if let Some(len) = inv_response.content_length { - self.stats.rpc_sent_resp_bytes.inc_by(len); - } + if let Some(len) = inv_response.content_length { + self.stats.rpc_sent_resp_bytes.inc_by(len); } // was response chunked? let msg = if inv_response.content_length.is_some() @@ -401,12 +408,9 @@ impl RpcClient { { self.stats.rpc_sent_resp_chunky.inc(); } - tokio::task::spawn_blocking(move || { - let ce = chunkify::chunkify_endpoint(None, lattice)?; - ce.get_unchunkified_response(&inv_response.invocation_id) - }) - .await - .map_err(|je| RpcError::Other(format!("join/resp-chunk: {}", je)))?? + ChunkEndpoint::with_client(lattice, self.client(), None) + .get_unchunkified_response(&inv_response.invocation_id) + .await? } else { inv_response.msg }; @@ -480,6 +484,10 @@ impl RpcClient { }) .await?; let nc = self.client(); + // TODO: revisit after doing some performance tuning and review of callers of pubish(). + // For high throughput use cases, it may be better to change the flush interval timer + // instead of flushing after every publish. + // Flushing here is good for low traffic use cases when optimizing for latency. tokio::spawn(async move { if let Err(error) = nc.flush().await { error!(%error, "flush after publish"); @@ -493,25 +501,19 @@ impl RpcClient { reply_to: String, response: InvocationResponse, lattice: &str, - ) -> Result<(), String> { + ) -> RpcResult<()> { let content_length = Some(response.msg.len() as u64); let response = { let inv_id = response.invocation_id.clone(); - if chunkify::needs_chunking(response.msg.len()) { + if needs_chunking(response.msg.len()) { #[cfg(feature = "prometheus")] { self.stats.rpc_recv_resp_chunky.inc(); } - let msg = response.msg; - let lattice = lattice.to_string(); - tokio::task::spawn_blocking(move || { - let ce = chunkify::chunkify_endpoint(None, lattice) - .map_err(|e| format!("connecting for chunkifying: {}", &e.to_string()))?; - ce.chunkify_response(&inv_id, &mut msg.as_slice()) - .map_err(|e| e.to_string()) - }) - .await - .map_err(|je| format!("join/response-chunk: {}", je))??; + let buf = response.msg; + ChunkEndpoint::with_client(lattice.to_string(), self.client(), None) + .chunkify_response(&inv_id, &mut buf.as_slice()) + .await?; InvocationResponse { msg: Vec::new(), content_length, @@ -523,27 +525,12 @@ impl RpcClient { }; match crate::common::serialize(&response) { - Ok(t) => { - if let Err(e) = self.client().publish(reply_to.clone(), t.into()).await { - error!( - %reply_to, - error = %e, - "failed sending rpc response", - ); - } - let nc = self.client(); - tokio::spawn(async move { - if let Err(error) = nc.flush().await { - error!(%error, "flush after publishing invocation response"); - } - }); - } + Ok(t) => Ok(self.publish(reply_to, t).await?), Err(e) => { // extremely unlikely that InvocationResponse would fail to serialize - error!(error = %e, "failed serializing InvocationResponse"); + Err(RpcError::Ser(format!("InvocationResponse: {}", e))) } } - Ok(()) } pub async fn dechunk(&self, mut inv: Invocation, lattice: &str) -> RpcResult { @@ -552,15 +539,10 @@ impl RpcClient { { self.stats.rpc_recv_chunky.inc(); } - let inv_id = inv.id.clone(); - let lattice = lattice.to_string(); - inv.msg = tokio::task::spawn_blocking(move || { - let ce = chunkify::chunkify_endpoint(None, lattice) - .map_err(|e| format!("connecting for de-chunkifying: {}", &e.to_string()))?; - ce.get_unchunkified(&inv_id).map_err(|e| e.to_string()) - }) - .await - .map_err(|je| format!("join/dechunk-validate: {}", je))??; + inv.msg = ChunkEndpoint::with_client(lattice.to_string(), self.client(), None) + .get_unchunkified(&inv.id.clone()) + .await + .map_err(|e| e.to_string())?; } Ok(inv) } @@ -642,8 +624,8 @@ pub fn with_connection_event_logging( use async_nats::Event; opts.event_callback(|event| async move { match event { - Event::Disconnect => warn!("nats client disconnected"), - Event::Reconnect => info!("nats client reconnected"), + Event::Disconnected => warn!("nats client disconnected"), + Event::Connected => info!("nats client connected"), Event::ClientError(err) => error!("nats client error: '{:?}'", err), Event::ServerError(err) => error!("nats server error: '{:?}'", err), Event::SlowConsumer(val) => warn!("nats slow consumer detected ({})", val), diff --git a/rpc-rs/src/timestamp.rs b/rpc-rs/src/timestamp.rs index fb88eec..df0d53d 100644 --- a/rpc-rs/src/timestamp.rs +++ b/rpc-rs/src/timestamp.rs @@ -110,7 +110,7 @@ impl From for Timestamp { .expect("system time before Unix epoch"); Timestamp { sec: d.as_secs() as i64, - nsec: d.subsec_nanos() as u32, + nsec: d.subsec_nanos(), } } } diff --git a/rpc-rs/tests/nats_sub.rs b/rpc-rs/tests/nats_sub.rs index af72094..262f4e8 100644 --- a/rpc-rs/tests/nats_sub.rs +++ b/rpc-rs/tests/nats_sub.rs @@ -3,24 +3,21 @@ use std::{str::FromStr, sync::Arc, time::Duration}; -use tracing::{debug, error}; +use test_log::test; +use tracing::{debug, error, info}; use wascap::prelude::KeyPair; use wasmbus_rpc::{ error::{RpcError, RpcResult}, - rpc_client::RpcClient, + rpc_client::{with_connection_event_logging, RpcClient}, }; const ONE_SEC: Duration = Duration::from_secs(1); -const THREE_SEC: Duration = Duration::from_secs(3); +const FIVE_SEC: Duration = Duration::from_secs(5); const TEST_NATS_ADDR: &str = "nats://127.0.0.1:4222"; const HOST_ID: &str = "HOST_test_nats_sub"; fn nats_url() -> String { - if let Ok(addr) = std::env::var("NATS_URL") { - addr - } else { - TEST_NATS_ADDR.to_string() - } + std::env::var("NATS_URL").unwrap_or_else(|_| TEST_NATS_ADDR.into()) } fn is_demo() -> bool { @@ -32,7 +29,7 @@ fn is_demo() -> bool { async fn make_client(timeout: Option) -> RpcResult { let nats_url = nats_url(); let server_addr = async_nats::ServerAddr::from_str(&nats_url).unwrap(); - let nc = async_nats::ConnectOptions::default() + let nc = with_connection_event_logging(async_nats::ConnectOptions::default()) .connect(server_addr) .await .map_err(|e| { @@ -59,7 +56,7 @@ async fn listen(client: RpcClient, subject: &str, pattern: &str) -> tokio::task: while let Some(msg) = sub.next().await { let payload = String::from_utf8_lossy(&msg.payload); if !pattern.is_match(payload.as_ref()) && &payload != "exit" { - println!("ERROR: payload on {}: {}", subject, &payload); + error!("payload on {}: {}", subject, &payload); } if let Some(reply_to) = msg.reply { client.publish(reply_to, b"ok".to_vec()).await.expect("reply"); @@ -69,7 +66,7 @@ async fn listen(client: RpcClient, subject: &str, pattern: &str) -> tokio::task: } count += 1; } - println!("received {} message(s)", count); + info!("listener received {} message(s)", count); count }) } @@ -96,7 +93,7 @@ async fn listen_bin(client: RpcClient, subject: &str) -> tokio::task::JoinHandle } } let _ = sub.unsubscribe().await; - debug!("listen_bin exiting with count {}", count); + info!("listen_bin exiting with count {}", count); count }) } @@ -123,7 +120,7 @@ async fn listen_queue( while let Some(msg) = sub.next().await { let payload = String::from_utf8_lossy(&msg.payload); if !pattern.is_match(payload.as_ref()) && &payload != "exit" { - debug!("ERROR: payload on {}: {}", &subject, &payload); + error!("payload on {}: {}", &subject, &payload); break; } if let Some(reply_to) = msg.reply { @@ -136,12 +133,12 @@ async fn listen_queue( } count += 1; } - println!("subscriber '{}' exiting count={}", &subject, count); + info!("subscriber '{}' exiting count={}", &subject, count); count }) } -#[tokio::test] +#[test(tokio::test(flavor = "multi_thread"))] async fn simple_sub() -> Result<(), Box> { // create unique subscription name for this test let sub_name = uuid::Uuid::new_v4().to_string(); @@ -159,23 +156,23 @@ async fn simple_sub() -> Result<(), Box> { } /// send large messages - this uses request() and does not test chunking -#[tokio::test] +#[test(tokio::test)] async fn test_message_size() -> Result<(), Box> { // create unique subscription name for this test let sub_name = uuid::Uuid::new_v4().to_string(); let topic = format!("bin_{}", &sub_name); - let l1 = listen_bin(make_client(Some(THREE_SEC)).await?, &topic).await; + let l1 = listen_bin(make_client(Some(FIVE_SEC)).await?, &topic).await; let mut pass_count = 0; - let sender = make_client(Some(THREE_SEC)).await.expect("creating bin sender"); + let sender = make_client(Some(FIVE_SEC)).await.expect("creating bin sender"); // messages sizes to test let test_sizes = if is_demo() { // if using 'demo.nats.io' as the test server, // don't abuse it by running this test with very large sizes // // The last size must be 1 to signal to listen_bin to exit - &[10u32, 25, 100, 200, 500, 1000, 1] + &[10u32, 100, 200, 500, 1000, 8000, 1] } else { // The last size must be 1 to signal to listen_bin to exit &[10u32, 25, 500, 10_000, 800_000, 1_000_000, 1] @@ -183,15 +180,14 @@ async fn test_message_size() -> Result<(), Box> { for size in test_sizes.iter() { let mut data = Vec::with_capacity(*size as usize); data.resize(*size as usize, 255u8); - let resp = match tokio::time::timeout(THREE_SEC, sender.request(topic.clone(), data)).await - { + let resp = match tokio::time::timeout(FIVE_SEC, sender.request(topic.clone(), data)).await { Ok(Ok(result)) => result, Ok(Err(rpc_err)) => { - eprintln!("send error on msg size {}: {}", *size, rpc_err); + error!("send error on msg size {}: {}", *size, rpc_err); continue; } Err(timeout_err) => { - eprintln!( + error!( "rpc timeout: sending msg of size {}: {}", *size, timeout_err ); @@ -201,10 +197,10 @@ async fn test_message_size() -> Result<(), Box> { let sbody = String::from_utf8_lossy(&resp); let received_size = sbody.parse::().expect("response contains int size"); if *size == received_size { - eprintln!("PASS: message_size: {}", size); + info!("PASS: message_size: {}", size); pass_count += 1; } else { - eprintln!("FAIL: message_size: {}, got: {}", size, received_size); + error!("FAIL: message_size: {}, got: {}", size, received_size); } } assert_eq!(pass_count, test_sizes.len(), "some size tests did not pass"); @@ -230,7 +226,7 @@ fn check_ok(data: Vec) -> Result<(), RpcError> { } } -#[tokio::test] +#[test(tokio::test)] async fn queue_sub() -> Result<(), Box> { // in this test, there are two queue subscribers. // on topic "one..." with the same queue group X,