From e6f079ee08a61af86788a4ecfe422b2302d23163 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 8 Jan 2025 12:55:39 -0500 Subject: [PATCH] The goblets of dawn are smashed. The weeping of the guitar begins. --- .github/CODEOWNERS | 2 + .github/workflows/rust-ci.yml | 11 + .gitignore | 3 + Cargo.toml | 65 ++++ LICENSE-APACHE | 176 +++++++++ LICENSE-MIT | 23 ++ README.md | 94 +++++ src/axum.rs | 34 ++ src/error.rs | 16 + src/lib.rs | 263 +++++++++++++ src/macros.rs | 89 +++++ src/primitives.rs | 90 +++++ src/pubsub/ipc.rs | 241 ++++++++++++ src/pubsub/mod.rs | 67 ++++ src/pubsub/shared.rs | 320 ++++++++++++++++ src/pubsub/trait.rs | 117 ++++++ src/pubsub/ws.rs | 117 ++++++ src/router.rs | 559 ++++++++++++++++++++++++++++ src/routes/ctx.rs | 77 ++++ src/routes/erased.rs | 153 ++++++++ src/routes/future.rs | 106 ++++++ src/routes/handler.rs | 676 ++++++++++++++++++++++++++++++++++ src/routes/method.rs | 82 +++++ src/routes/mod.rs | 132 +++++++ src/types/error.rs | 31 ++ src/types/mod.rs | 14 + src/types/req.rs | 205 +++++++++++ src/types/resp.rs | 418 +++++++++++++++++++++ 28 files changed, 4181 insertions(+) create mode 100644 .github/CODEOWNERS create mode 100644 .github/workflows/rust-ci.yml create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 README.md create mode 100644 src/axum.rs create mode 100644 src/error.rs create mode 100644 src/lib.rs create mode 100644 src/macros.rs create mode 100644 src/primitives.rs create mode 100644 src/pubsub/ipc.rs create mode 100644 src/pubsub/mod.rs create mode 100644 src/pubsub/shared.rs create mode 100644 src/pubsub/trait.rs create mode 100644 src/pubsub/ws.rs create mode 100644 src/router.rs create mode 100644 src/routes/ctx.rs create mode 100644 src/routes/erased.rs create mode 100644 src/routes/future.rs create mode 100644 src/routes/handler.rs create mode 100644 src/routes/method.rs create mode 100644 src/routes/mod.rs create mode 100644 src/types/error.rs create mode 100644 src/types/mod.rs create mode 100644 src/types/req.rs create mode 100644 src/types/resp.rs diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..69f7dc9 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +*: @Evalir @prestwich @dylanlott +.github/: @rswanson \ No newline at end of file diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml new file mode 100644 index 0000000..66a8553 --- /dev/null +++ b/.github/workflows/rust-ci.yml @@ -0,0 +1,11 @@ +name: Rust CI + +on: + push: + branches: [main] + pull_request: + +# simplest example of using the rust-base action +jobs: + rust-base: + uses: init4tech/actions/.github/workflows/rust-base.yml@main diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..696419e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +*/.DS_Store +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..48bad02 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,65 @@ +[package] +name = "ajj" +description = "Simple, modern, ergonomic JSON-RPC 2.0 server built with tower and axum" + +version = "0.1.0" +edition = "2021" +rust-version = "1.81" +authors = ["init4", "James Prestwich"] +license = "MIT OR Apache-2.0" +homepage = "https://github.com/init4tech/rpc" +repository = "https://github.com/init4tech/rpc" + +[dependencies] +bytes = "1.9.0" +pin-project = "1.1.8" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.135" +thiserror = "2.0.9" +tokio = { version = "1.43.0", features = ["sync", "rt", "macros"] } +tower = { version = "0.5.2", features = ["util"] } +tracing = "0.1.41" + +# axum +axum = { version = "0.8.1", optional = true } + +# pubsub +tokio-stream = { version = "0.1.17", optional = true } + +# ipc +interprocess = { version = "2.2.2", features = ["async", "tokio"], optional = true } +tokio-util = { version = "0.7.13", optional = true, features = ["io"] } + +# ws +tokio-tungstenite = { version = "0.26.1", features = ["rustls-tls-webpki-roots"], optional = true } +futures-util = { version = "0.3.31", optional = true } + +[features] +default = ["axum", "ws", "ipc"] +axum = ["dep:axum"] +pubsub = ["dep:tokio-stream"] +ipc = ["pubsub", "dep:tokio-util", "dep:interprocess"] +ws = ["pubsub", "dep:tokio-tungstenite", "dep:futures-util"] +tokio-util = ["dep:tokio-util"] + +[profile.release] +opt-level = 3 +lto = "thin" +debug = "line-tables-only" +strip = true +panic = "unwind" +codegen-units = 16 + +[profile.profiling] +inherits = "release" +debug = 2 +strip = false + +[profile.bench] +inherits = "profiling" + +[profile.ci-rust] +inherits = "dev" +strip = true +debug = false +incremental = false diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..1b5ec8b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e701a9f --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# AJJ: A Json-RPC Router + +A general-purpose, batteries-included [JSON-RPC 2.0] router, inspired by axum's +routing system. + +## High-level features + +We aim to provide a + +- Copy-free routing of JSON-RPC requests to method handlers. +- No-boilerplate method handlers. Just write an async function. +- Support for pubsub-style notifications. +- Built-in support for axum, and tower's middleware and service ecosystem. +- Basic built-in pubsub server implementations for WS and IPC. + +## Concepts + +- A **Handler** is a function that processes some JSON input, and succeeds or + fails with some data. +- A **Method** is a handler with an associated name, which is used by the + incoming [request] to identify a specific **Handler**. +- A **Router** is a collection of **Methods**. +- A **Server** exposes a **Router** over some transport. + +## Basic Usage + +See the [crate documentation on docs.rs] for more detailed examples. + +```rust +use ajj::{Router}; + +// Provide methods called "double" and "add" to the router. +let router = Router::::new() + // "double" returns the double of the request's parameter. + .route("double", |params: u64| async move { + Ok::<_, ()>(params * 2) + }) + // "add" returns the sum of the request's parameters and the router's stored + // state. + .route("add", |params: u64, state: u64| async move { + Ok::<_, ()>(params + state) + }) + // The router is provided with state, and is now ready to handle requests. + .with_state::<()>(3u64); +``` + +## Feature flags + +- `axum` - implements the `tower::Service` trait for `Router`, allowing it to + be used as an [axum]() handler. +- `pubsub` - adds traits and tasks for serving the router over streaming + interfaces. +- `ws` - adds implementations of the `pubsub` traits for + [tokio-tungstenite]. +- `ipc` - adds implementations of the `pubsub` traits for + [interprocess `local_sockets`]. + +## Serving the Router + +We recommend [axum] for serving the router over HTTP. The `Router` provides an +`into_axum(path: &str)` method to instantiate a new [`axum::Router`], and +register the router to handle requests. + +For WS and IPC connections, the `pubsub` module provides implementations of the +[`Connect`] trait for [`std::net::SocketAddr`] to create simple WS servers, and +[`interprocess::local_socket::ListenerOptions`] to create simple IPC servers. +Users with more complex needs should provide their own [`Connect`] +implementations. + +See the [crate documentation on docs.rs] for more detailed examples. + +## Note on code provenance + +Some code in this project has been reproduced or adapted from other projects. +Files containing that code contain a note at the bottom of the file indicating +the original source, and containing relevant license information. Code has been +reproduced from the following projects, and we are grateful for their work: + +- [axum] - for the `Router` struct and `Handler` trait, as well as for the + internal method, route, and handler representations. +- [alloy] - for the `ResponsePayload` and associated structs, the `RpcSend` and + `RpcRecv` family of traits, and the `JsonReadStream` in the `ipc` module. + +[crate documentation on docs.rs]: https://docs.rs/ajj/latest/ajj/ +[`Connect`]: https://docs.rs/ajj/latest/ajj/pubsub/trait.Connect.html +[JSON-RPC 2.0]: https://www.jsonrpc.org/specification +[request]: https://www.jsonrpc.org/specification#request_object +[axum]: https://docs.rs/axum/latest/axum/ +[tokio-tungstenite]: https://docs.rs/tokio-tungstenite/latest/tokio_tungstenite/ +[`axum::Router`]: https://docs.rs/axum/latest/axum/struct.Router.html +[interprocess `local_sockets`]: https://docs.rs/interprocess/latest/interprocess/local_socket/tokio/index.html +[`interprocess::local_socket::ListenerOptions`]: https://docs.rs/interprocess/latest/interprocess/local_socket/struct.ListenerOptions.html +[std::net::SocketAddr]: https://doc.rust-lang.org/std/net/enum.SocketAddr.html +[alloy]: https://docs.rs/alloy/latest/alloy/ diff --git a/src/axum.rs b/src/axum.rs new file mode 100644 index 0000000..e11e271 --- /dev/null +++ b/src/axum.rs @@ -0,0 +1,34 @@ +use crate::{types::Response, HandlerArgs}; +use axum::{extract::FromRequest, response::IntoResponse}; +use bytes::Bytes; +use std::{future::Future, pin::Pin}; + +impl axum::handler::Handler for crate::Router +where + S: Clone + Send + Sync + 'static, +{ + type Future = Pin + Send>>; + + fn call(self, req: axum::extract::Request, state: S) -> Self::Future { + Box::pin(async move { + let Ok(bytes) = Bytes::from_request(req, &state).await else { + return Box::::from(Response::parse_error()).into_response(); + }; + + let Ok(req) = crate::types::Request::try_from(bytes) else { + return Box::::from(Response::parse_error()).into_response(); + }; + + let args = HandlerArgs { + ctx: Default::default(), + req, + }; + + // Default handler ctx does not allow for notifications, which is + // what we want over HTTP. + let response = unwrap_infallible!(self.call_with_state(args, state).await); + + Box::::from(response).into_response() + }) + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..4af04fa --- /dev/null +++ b/src/error.rs @@ -0,0 +1,16 @@ +use std::borrow::Cow; + +/// Errors that can occur when registering a method. +#[derive(Debug, Clone, thiserror::Error)] +pub enum RegistrationError { + /// Method name is already in use. + #[error("Method already registered: {0}")] + MethodAlreadyRegistered(Cow<'static, str>), +} + +impl RegistrationError { + /// Create a new `MethodAlreadyRegistered` error. + pub fn method_already_registered(name: impl Into>) -> Self { + Self::MethodAlreadyRegistered(name.into()) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..11e8759 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,263 @@ +//! AJJ: A JSON-RPC router inspired by axum's `Router`. +//! +//! This crate provides a way to define a JSON-RPC router that can be used to +//! route requests to handlers. It is inspired by the [`axum`] crate's +//! [`axum::Router`]. +//! +//! ## Basic usage +//! +//! The [`Router`] type is the main type provided by this crate. It is used to +//! register JSON-RPC methods and their handlers. +//! +//! ```no_run +//! use ajj::{Router, HandlerCtx, ResponsePayload}; +//! +//! # fn main() { +//! // Provide methods called "double" and "add" to the router. +//! let router = Router::::new() +//! .route("double", |params: u64| async move { +//! Ok::<_, ()>(params * 2) +//! }) +//! .route("add", |params: u64, state: u64| async move { +//! Ok::<_, ()>(params + state) +//! }) +//! // Routes get a ctx, which can be used to send notifications. +//! .route("notify", |ctx: HandlerCtx| async move { +//! if ctx.notifications().is_none() { +//! // This error will appear in the ResponsePayload's `data` field. +//! return Err("notifications are disabled"); +//! } +//! +//! let req_id = 15u8; +//! +//! tokio::task::spawn_blocking(move || { +//! // something expensive goes here +//! let result = 100_000_000; +//! let _ = ctx.notify(&serde_json::json!({ +//! "req_id": req_id, +//! "result": result, +//! })); +//! }); +//! Ok(req_id) +//! }) +//! .route("error_example", || async { +//! // This will appear in the ResponsePayload's `message` field. +//! ResponsePayload::<(), ()>::internal_error_message("this is an error".into()) +//! }) +//! // The router is provided with state, and is now ready to serve requests. +//! .with_state::<()>(3u64); +//! # } +//! ``` +//! +//! ## Handlers +//! +//! Methods are routed via the [`Handler`] trait, which is blanket implemented +//! for many async functions. [`Handler`] contain implement the logic executed +//! when calling methods on the JSON-RPC router. +//! +//! Handlers can return either +//! - `Result where T: Serialize, E: Serialize` +//! - `ResponsePayload where T: Serialize, E: Serialize` +//! +//! These types will be serialized into the JSON-RPC response. The `T` type +//! represents the result of the method, and the `E` type represents an error +//! response. The `E` type is optional, and can be set to `()` if no error +//! response is needed. +//! +//! See the [`Handler`] trait docs for more information. +//! +//! ## Serving the Router +//! +//! We recommend [`axum`] for serving the router over HTTP. When the `"axum"` +//! feature flag is enabled, The [`Router`] provides +//! `Router::into_axum(path: &str)` to instantiate a new [`axum::Router`], and +//! register the router to handle requests. You can then serve the +//! [`axum::Router`] as normal, or add additional routes to it. +//! +//! ```no_run +//! # #[cfg(feature = "axum")] +//! # { +//! # use ajj::{Router, HandlerCtx, ResponsePayload}; +//! # async fn _main(router: Router<()>) { +//! // Instantiate a new axum router, and register the JSON-RPC router to handle +//! // requests at the `/rpc` path, and serve it on port 3000. +//! let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); +//! axum::serve(listener, router.into_axum("/rpc")).await.unwrap(); +//! # }} +//! ``` +//! +//! For WS and IPC connections, the `pubsub` module provides implementations of +//! the `Connect` trait for [`std::net::SocketAddr`] to create simple WS +//! servers, and [`interprocess::local_socket::ListenerOptions`] to create +//! simple IPC servers. +//! +//! ```no_run +//! # #[cfg(feature = "pubsub")] +//! # { +//! # use ajj::{Router, pubsub::Connect}; +//! # async fn _main(router:Router<()>) { +//! // Serve the router over websockets on port 3000. +//! let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000)); +//! // The shutdown object will stop the server when dropped. +//! let shutdown = addr.run(router).await.unwrap(); +//! # }} +//! ``` +//! +#![cfg_attr( + feature = "pubsub", + doc = "See the [`pubsub`] module documentation for more information." +)] +//! +//! [`axum`]: https://docs.rs/axum/latest/axum/index.html +//! [`axum::Router`]: https://docs.rs/axum/latest/axum/routing/struct.Router.html +//! [`ResponsePayload`]: alloy::rpc::json_rpc::ResponsePayload + +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +#[macro_use] +pub(crate) mod macros; + +#[cfg(feature = "axum")] +mod axum; + +mod error; +pub use error::RegistrationError; + +mod primitives; +pub use primitives::{BorrowedRpcObject, MethodId, RpcBorrow, RpcObject, RpcRecv, RpcSend}; + +#[cfg(feature = "pubsub")] +pub mod pubsub; + +mod routes; +pub(crate) use routes::{BoxedIntoRoute, ErasedIntoRoute, Method, Route}; +pub use routes::{Handler, HandlerArgs, HandlerCtx, NotifyError, RouteFuture}; + +mod router; +pub use router::Router; + +mod types; +pub use types::{ErrorPayload, ResponsePayload}; + +/// Re-export of the `tower` crate, primarily to provide [`tower::Service`], +/// and [`tower::service_fn`]. +pub use tower; + +/// Re-export of the `serde_json` crate, primarily to provide the `RawValue` type. +pub use serde_json::{self, value::RawValue}; + +#[cfg(test)] +pub(crate) mod test_utils { + use serde_json::value::RawValue; + + #[track_caller] + pub(crate) fn assert_rv_eq(a: &RawValue, b: &str) { + let left = serde_json::from_str::(a.get()).unwrap(); + let right = serde_json::from_str::(b).unwrap(); + assert_eq!(left, right); + } +} + +#[cfg(test)] +mod test { + + use crate::{ + router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, ResponsePayload, + }; + use bytes::Bytes; + use serde_json::value::RawValue; + use std::borrow::Cow; + + // more of an example really + #[tokio::test] + async fn example() { + let router: RouterInner<()> = RouterInner::new() + .route("hello_world", || async { + ResponsePayload::<(), _>::internal_error_with_message_and_obj( + Cow::Borrowed("Hello, world!"), + 30u8, + ) + }) + .route("foo", |a: Box, _state: u64| async move { + Ok::<_, ()>(a.get().to_owned()) + }) + .with_state(&3u64); + + let req = Bytes::from_static(r#"{"id":1,"method":"hello_world","params":[]}"#.as_bytes()); + + let res = router + .call_with_state( + HandlerArgs { + ctx: Default::default(), + req: req.try_into().unwrap(), + }, + (), + ) + .await + .expect("infallible"); + + assert_rv_eq( + &res, + r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32603,"message":"Hello, world!","data":30}}"#, + ); + + let req2 = Bytes::from_static(r#"{"id":1,"method":"foo","params":{}}"#.as_bytes()); + + let res2 = router + .call_with_state( + HandlerArgs { + ctx: Default::default(), + req: req2.try_into().unwrap(), + }, + (), + ) + .await + .expect("infallible"); + + assert_rv_eq(&res2, r#"{"jsonrpc":"2.0","id":1,"result":"{}"}"#); + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/routing/mod.rs#L119 +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 0000000..105eaa4 --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,89 @@ +/// Used by the [`Router`] to modify the type of the [`RouterInner`] and return +/// a new [`Router`]. +/// +/// [`Router`]: crate::Router +/// [`RouterInner`]: crate::router::RouterInner +macro_rules! map_inner { + ( $self_:ident, $inner:pat_param => $expr:expr) => { + #[allow(redundant_semicolons)] + { + let $inner = $self_.into_inner(); + Router { + inner: Arc::new($expr), + } + } + }; +} + +/// Used by the [`Router`] to access methods on the [`RouterInner`] without +/// modifying the inner type. +/// +/// [`Router`]: crate::Router +/// [`RouterInner`]: crate::router::RouterInner +macro_rules! tap_inner { + ( $self_:ident, mut $inner:ident => { $($stmt:stmt)* } ) => { + #[allow(redundant_semicolons)] + { + let mut $inner = $self_.into_inner(); + $($stmt)* + Router { + inner: Arc::new($inner), + } + } + }; +} + +/// Unwrap a result, panic with the `Display` of the error if it is an `Err`. +macro_rules! panic_on_err { + ($expr:expr) => { + match $expr { + Ok(x) => x, + Err(err) => panic!("{err}"), + } + }; +} + +/// Unwrap a result contianing an `Infallible`. +#[allow(unused_macros)] // used in some features +macro_rules! unwrap_infallible { + ($expr:expr) => { + match $expr { + Ok(x) => x, + Err(_) => unreachable!("Infallible"), + } + }; +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/routing/mod.rs#L119 +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/primitives.rs b/src/primitives.rs new file mode 100644 index 0000000..6db5110 --- /dev/null +++ b/src/primitives.rs @@ -0,0 +1,90 @@ +use core::{ + fmt, + ops::{Add, AddAssign}, +}; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +/// A unique internal identifier for a method. +#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct MethodId(usize); + +impl From for MethodId { + fn from(id: usize) -> Self { + Self(id) + } +} + +impl Add for MethodId { + type Output = Self; + + fn add(self, rhs: usize) -> Self::Output { + Self(self.0 + rhs) + } +} + +impl AddAssign for MethodId { + fn add_assign(&mut self, rhs: usize) { + self.0 += rhs; + } +} + +/// An object that can be sent over RPC. +/// +/// This marker trait is blanket-implemented for every qualifying type. It is +/// used to indicate that a type can be sent in the body of a JSON-RPC message. +pub trait RpcSend: Serialize + Clone + fmt::Debug + Send + Sync + Unpin {} + +impl RpcSend for T where T: Serialize + Clone + fmt::Debug + Send + Sync + Unpin {} + +/// An object that can be received over RPC. +/// +/// This marker trait is blanket-implemented for every qualifying type. It is +/// used to indicate that a type can be received in the body of a JSON-RPC +/// message. +/// +/// # Note +/// +/// We add the `'static` lifetime to the supertraits to indicate that the type +/// can't borrow. This is a simplification that makes it easier to use the +/// types in client code. Servers may prefer borrowing, using the [`RpcBorrow`] +/// trait. +pub trait RpcRecv: DeserializeOwned + fmt::Debug + Send + Sync + Unpin + 'static {} + +impl RpcRecv for T where T: DeserializeOwned + fmt::Debug + Send + Sync + Unpin + 'static {} + +/// An object that can be received over RPC, borrowing from the the +/// deserialization context. +/// +/// This marker trait is blanket-implemented for every qualifying type. It is +/// used to indicate that a type can be borrowed from the body of a wholly or +/// partially serialized JSON-RPC message. +pub trait RpcBorrow<'de>: Deserialize<'de> + fmt::Debug + Send + Sync + Unpin {} + +impl<'de, T> RpcBorrow<'de> for T where T: Deserialize<'de> + fmt::Debug + Send + Sync + Unpin {} + +/// An object that can be both sent and received over RPC. +/// +/// This marker trait is blanket-implemented for every qualifying type. It is +/// used to indicate that a type can be both sent and received in the body of a +/// JSON-RPC message. +/// +/// # Note +/// +/// We add the `'static` lifetime to the supertraits to indicate that the type +/// can't borrow. This is a simplification that makes it easier to use the +/// types in client code. Servers may prefer borrowing, using the +/// [`BorrowedRpcObject`] trait. +pub trait RpcObject: RpcSend + RpcRecv {} + +impl RpcObject for T where T: RpcSend + RpcRecv {} + +/// An object that can be both sent and received over RPC, borrowing from the +/// the deserialization context. +/// +/// This marker trait is blanket-implemented for every qualifying type. It is +/// used to indicate that a type can be both sent and received in the body of a +/// JSON-RPC message, and can borrow from the deserialization context. +pub trait BorrowedRpcObject<'de>: RpcBorrow<'de> + RpcSend {} + +impl<'de, T> BorrowedRpcObject<'de> for T where T: RpcBorrow<'de> + RpcSend {} diff --git a/src/pubsub/ipc.rs b/src/pubsub/ipc.rs new file mode 100644 index 0000000..525195d --- /dev/null +++ b/src/pubsub/ipc.rs @@ -0,0 +1,241 @@ +use bytes::{Buf, Bytes, BytesMut}; +use futures_util::Stream; +use interprocess::local_socket::{ + tokio::{Listener, RecvHalf, SendHalf}, + traits::tokio::Stream as _, + ListenerOptions, +}; +use pin_project::pin_project; +use serde_json::value::RawValue; +use std::{ + io, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio_util::io::poll_read_buf; +use tracing::{debug, error, trace}; + +impl crate::pubsub::Listener for Listener { + type RespSink = SendHalf; + + type ReqStream = IpcBytesStream; + + type Error = io::Error; + + async fn accept(&self) -> Result<(Self::RespSink, Self::ReqStream), Self::Error> { + let conn = interprocess::local_socket::traits::tokio::Listener::accept(self).await?; + + let (recv, send) = conn.split(); + + Ok((send, recv.into())) + } +} + +impl crate::pubsub::JsonSink for SendHalf { + type Error = std::io::Error; + + async fn send_json(&mut self, json: Box) -> Result<(), Self::Error> { + self.write_all(json.get().as_bytes()).await + } +} + +impl crate::pubsub::Connect for ListenerOptions<'_> { + type Listener = Listener; + + type Error = io::Error; + + async fn make_listener(self) -> Result { + self.create_tokio() + } +} + +/// A stream that pulls data from the IPC connection and yields [`Bytes`] +/// containing +#[derive(Debug)] +#[pin_project] +pub struct IpcBytesStream { + #[pin] + inner: ReadJsonStream>, + + /// Whether the stream has been drained. + drained: bool, +} + +impl IpcBytesStream { + fn new(inner: RecvHalf) -> Self { + Self { + inner: inner.into(), + drained: false, + } + } +} + +impl From for IpcBytesStream { + fn from(inner: RecvHalf) -> Self { + Self::new(inner) + } +} + +impl Stream for IpcBytesStream { + type Item = Bytes; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match ready!(this.inner.poll_next(cx)) { + Some(item) => { + let item: Box = item.into(); + Poll::Ready(Some(Bytes::from_owner(item.into_boxed_bytes()))) + } + None => Poll::Ready(None), + } + } +} + +/// Default capacity for the IPC buffer. +const CAPACITY: usize = 4096; + +/// A stream of JSON-RPC items, read from an [`AsyncRead`] stream. +#[derive(Debug)] +#[pin_project::pin_project] +pub(crate) struct ReadJsonStream { + /// The underlying reader. + #[pin] + reader: T, + /// A buffer for reading data from the reader. + buf: BytesMut, + /// Whether the buffer has been drained. + drained: bool, + + /// PhantomData marking the item type this stream will yield. + _pd: std::marker::PhantomData, +} + +impl ReadJsonStream { + fn new(reader: T) -> Self { + Self { + reader, + buf: BytesMut::with_capacity(CAPACITY), + drained: true, + _pd: core::marker::PhantomData, + } + } +} + +impl From for ReadJsonStream { + fn from(reader: T) -> Self { + Self::new(reader) + } +} + +impl Stream for ReadJsonStream +where + Item: serde::de::DeserializeOwned, +{ + type Item = Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + + loop { + // try decoding from the buffer, but only if we have new data + if !*this.drained { + debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data"); + let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter(); + + let item = de.next(); + + // advance the buffer + this.buf.advance(de.byte_offset()); + + match item { + Some(Ok(response)) => { + return Poll::Ready(Some(response)); + } + Some(Err(err)) => { + if err.is_data() { + trace!( + buffer = %String::from_utf8_lossy(this.buf.as_ref()), + "IPC buffer contains invalid JSON data", + ); + + // this happens if the deserializer is unable to decode a partial object + *this.drained = true; + } else if err.is_eof() { + trace!("partial object in IPC buffer"); + // nothing decoded + *this.drained = true; + } else { + error!(%err, "IPC response contained invalid JSON. Buffer contents will be logged at trace level"); + trace!( + buffer = %String::from_utf8_lossy(this.buf.as_ref()), + "IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.", + ); + + return Poll::Ready(None); + } + } + None => { + // nothing decoded + *this.drained = true; + } + } + } + + // read more data into the buffer + match ready!(poll_read_buf(this.reader.as_mut(), cx, &mut this.buf)) { + Ok(0) => { + // stream is no longer readable and we're also unable to decode any more + // data. This happens if the IPC socket is closed by the other end. + // so we can return `None` here. + debug!("IPC socket EOF, stream is closed"); + return Poll::Ready(None); + } + Ok(data_len) => { + debug!(%data_len, "Read data from IPC socket"); + // can try decoding again + *this.drained = false; + } + Err(err) => { + error!(%err, "Failed to read from IPC socket, shutting down"); + return Poll::Ready(None); + } + } + } + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `alloy` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/alloy-rs/alloy +// +// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs new file mode 100644 index 0000000..0ca3c1d --- /dev/null +++ b/src/pubsub/mod.rs @@ -0,0 +1,67 @@ +//! Pubsub serving utils for [`Router`]s. +//! +//! This module provides pubsub functionality for serving [`Router`]s over +//! various connection types. Built-in support is provided for IPC and +//! Websockets, and a trait system is provided for custom connection types. +//! +//! ## Overview +//! +//! The pubsub module provides a way to serve a [`Router`] over pubsub +//! connections like IPC and Websockets. +//! +//! ## Usage +//! +//! Typically users want to use a [`Connect`] implementor to create a server +//! using [`Connect::run`]. This will create a [`Listener`], and spawn tasks +//! to accept connections via that [`Listener`] and route requests. +//! +//! ### Advanced Usage +//! +//! #### Custom Connector +//! +//! [`Connect`] has been implemented for +//! [`interprocess::local_socket::ListenerOptions`] (producing a +//! [`interprocess::local_socket::tokio::Listener`]), +//! and for [`SocketAddr`] (producing a [`TcpListener`]). +//! +//! Custom [`Connect`] implementors can configure the listener in any way they +//! need. This is useful for (e.g.) configuring network or security policies on +//! the inbound TLS connection. +//! +//! #### Custom Listener +//! +//! If you need more control over the server, you can create your own +//! [`Listener`]. This is useful if you need to customize the connection +//! handling, or if you need to use a different connection type. +//! +//! [`Listener`]'s associated stream and sink types are used to read requests +//! and write responses. These types must implement [`JsonReqStream`] and +//! [`JsonSink`] respectively. +//! +//! ## Internal Structure +//! +//! There are 3 tasks:x +//! - `ListenerTask` - listens for new connections, accepts, and spawns +//! `RouteTask` and `WriteTask` for each. A listener task is spawned for each +//! style of connection (e.g. IPC or Websockets). +//! - `RouteTask` - Reads requests from an inbound connection, and spawns a +//! tokio task for each request. There is 1 `RouteTask` per connection. +//! - `WriteTask` - Manages outbound connections, receives responses from the +//! router, and writes responses to the relevant connection. There is 1 +//! `WriteTask` per connection. +//! +//! [`Router`]: crate::Router +//! [`SocketAddr`]: std::net::SocketAddr +//! [`TcpListener`]: tokio::net::TcpListener + +#[cfg(feature = "ipc")] +mod ipc; + +mod shared; +pub use shared::{ConnectionId, ServerShutdown, DEFAULT_INSTRUCTION_BUFFER_PER_TASK}; + +mod r#trait; +pub use r#trait::{Connect, In, JsonReqStream, JsonSink, Listener, Out}; + +#[cfg(feature = "ws")] +mod ws; diff --git a/src/pubsub/shared.rs b/src/pubsub/shared.rs new file mode 100644 index 0000000..437a712 --- /dev/null +++ b/src/pubsub/shared.rs @@ -0,0 +1,320 @@ +use core::fmt; + +use crate::{ + pubsub::{In, JsonSink, Listener, Out}, + types::Request, + HandlerArgs, +}; +use serde_json::value::RawValue; +use tokio::{ + select, + sync::{mpsc, oneshot, watch}, + task::JoinHandle, +}; +use tokio_stream::StreamExt; +use tracing::{debug, debug_span, error, instrument, trace, Instrument}; + +/// Default instruction buffer size per task. +pub const DEFAULT_INSTRUCTION_BUFFER_PER_TASK: usize = 15; + +/// Type alias for identifying connections. +pub type ConnectionId = u64; + +/// Holds the shutdown signal for some server. +#[derive(Debug)] +pub struct ServerShutdown { + pub(crate) _shutdown: watch::Sender<()>, +} + +impl From> for ServerShutdown { + fn from(sender: watch::Sender<()>) -> Self { + Self { _shutdown: sender } + } +} + +/// The `ListenerTask` listens for new connections, and spawns `RouteTask`s for +/// each. +pub(crate) struct ListenerTask { + pub(crate) listener: T, + pub(crate) manager: ConnectionManager, +} + +impl ListenerTask +where + T: Listener, +{ + /// Task future, which will be run by [`Self::spawn`]. + /// + /// This future is a simple loop that accepts new connections, and uses + /// the [`ConnectionManager`] to handle them. + pub(crate) async fn task_future(self) { + let ListenerTask { + listener, + mut manager, + } = self; + + loop { + let (resp_sink, req_stream) = match listener.accept().await { + Ok((resp_sink, req_stream)) => (resp_sink, req_stream), + Err(err) => { + error!(%err, "Failed to accept connection"); + // TODO: should these errors be considered persistent? + continue; + } + }; + + manager.handle_new_connection::(req_stream, resp_sink); + } + } + + /// Spawn the future produced by [`Self::task_future`]. + pub(crate) fn spawn(self) -> JoinHandle<()> { + let future = self.task_future(); + tokio::spawn(future) + } +} + +/// The `ConnectionManager` provides connections with IDs, and handles spawning +/// the [`RouteTask`] for each connection. +pub(crate) struct ConnectionManager { + pub(crate) shutdown: watch::Receiver<()>, + + pub(crate) next_id: ConnectionId, + + pub(crate) router: crate::Router<()>, + + pub(crate) instruction_buffer_per_task: usize, +} + +impl ConnectionManager { + /// Increment the connection ID counter and return an unused ID. + fn next_id(&mut self) -> ConnectionId { + let id = self.next_id; + self.next_id += 1; + id + } + + /// Get a clone of the router. + fn router(&self) -> crate::Router<()> { + self.router.clone() + } + + /// Create new [`RouteTask`] and [`WriteTask`] for a connection. + fn make_tasks( + &self, + conn_id: ConnectionId, + requests: In, + connection: Out, + ) -> (RouteTask, WriteTask) { + let (tx, rx) = mpsc::channel(self.instruction_buffer_per_task); + + let (gone_tx, gone_rx) = oneshot::channel(); + + let rt = RouteTask { + router: self.router(), + conn_id, + write_task: tx, + requests, + gone: gone_tx, + }; + + let wt = WriteTask { + shutdown: self.shutdown.clone(), + gone: gone_rx, + conn_id, + json: rx, + connection, + }; + + (rt, wt) + } + + /// Spawn a new [`RouteTask`] and [`WriteTask`] for a connection. + fn spawn_tasks(&mut self, requests: In, connection: Out) { + let conn_id = self.next_id(); + let (rt, wt) = self.make_tasks::(conn_id, requests, connection); + rt.spawn(); + wt.spawn(); + } + + /// Handle a new connection, enrolling it in the write task, and spawning + /// its route task. + fn handle_new_connection(&mut self, requests: In, connection: Out) { + self.spawn_tasks::(requests, connection); + } +} + +/// Task that reads requests from a stream, and routes them to the +/// [`Router`], ensures responses are sent to the [`WriteTask`]. +/// +/// [`Router`]: crate::Router +struct RouteTask { + /// Router for handling requests. + pub(crate) router: crate::Router<()>, + /// Connection ID for the connection serviced by this task. + pub(crate) conn_id: ConnectionId, + /// Sender to the write task. + pub(crate) write_task: mpsc::Sender>, + /// Stream of requests. + pub(crate) requests: In, + /// Sender to the [`WriteTask`], to notify it that this task is done. + pub(crate) gone: oneshot::Sender<()>, +} + +impl fmt::Debug for RouteTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RouteTask") + .field("conn_id", &self.conn_id) + .finish_non_exhaustive() + } +} + +impl RouteTask +where + T: crate::pubsub::Listener, +{ + /// Task future, which will be run by [`Self::spawn`]. + /// + /// This future is a simple loop, which reads requests from the stream, + /// and routes them to the router. For each request, a new task is spawned + /// to handle the request, and given a sender to the [`WriteTask`]. This + /// ensures that requests can be handled concurrently. + #[instrument(name = "RouteTask", skip(self), fields(conn_id = self.conn_id))] + pub async fn task_future(self) { + let RouteTask { + router, + mut requests, + write_task, + gone, + .. + } = self; + + loop { + select! { + biased; + _ = write_task.closed() => { + debug!("IpcWriteTask has gone away"); + break; + } + item = requests.next() => { + let Some(item) = item else { + trace!("IPC read stream has closed"); + break; + }; + + let Ok(req) = Request::try_from(item) else { + tracing::warn!("inbound request is malformatted"); + continue + }; + + let span = debug_span!("ipc request handling", id = req.id(), method = req.method()); + + let args = HandlerArgs { + ctx: write_task.clone().into(), + req, + }; + + let fut = router.handle_request(args); + let write_task = write_task.clone(); + + // Run the future in a new task. + tokio::spawn( + async move { + // Run the request handler and serialize the + // response. + let rv = fut.await.expect("infallible"); + + // Send the response to the write task. + // we don't care if the receiver has gone away, + // as the task is done regardless. + let _ = write_task.send( + rv + ).await; + } + .instrument(span) + ); + } + } + } + // No funny business. Drop the gone signal. + drop(gone); + } + + /// Spawn the future produced by [`Self::task_future`]. + pub(crate) fn spawn(self) -> tokio::task::JoinHandle<()> { + let future = self.task_future(); + tokio::spawn(future) + } +} + +/// The Write Task is responsible for writing JSON to the outbound connection. +struct WriteTask { + /// Shutdown signal. + /// + /// Shutdowns bubble back up to [`RouteTask`] when the write task is + /// dropped, via the closed `json` channel. + pub(crate) shutdown: watch::Receiver<()>, + + /// Signal that the connection has gone away. + pub(crate) gone: oneshot::Receiver<()>, + + /// ID of the connection. + pub(crate) conn_id: ConnectionId, + + /// JSON to be written to the outbound connection. + /// + /// Dropping this channel will cause the associated [`RouteTask`] to + /// shutdown. + pub(crate) json: mpsc::Receiver>, + + /// Outbound connections. + pub(crate) connection: Out, +} + +impl WriteTask { + /// Task future, which will be run by [`Self::spawn`]. + /// + /// This is a simple loop, that reads instructions from the instruction + /// channel, and acts on them. It handles JSON messages, and going away + /// instructions. It also listens for the global shutdown signal from the + /// [`ServerShutdown`] struct. + #[instrument(skip(self), fields(conn_id = self.conn_id))] + pub(crate) async fn task_future(self) { + let WriteTask { + mut shutdown, + mut gone, + mut json, + mut connection, + .. + } = self; + shutdown.mark_unchanged(); + loop { + select! { + biased; + _ = &mut gone => { + debug!("Connection has gone away"); + break; + } + _ = shutdown.changed() => { + debug!("shutdown signal received"); + break; + } + json = json.recv() => { + let Some(json) = json else { + tracing::error!("Json stream has closed"); + break; + }; + if let Err(err) = connection.send_json(json).await { + debug!(%err, "Failed to send json"); + break; + } + } + } + } + } + + /// Spawn the future produced by [`Self::task_future`]. + pub(crate) fn spawn(self) -> JoinHandle<()> { + tokio::spawn(self.task_future()) + } +} diff --git a/src/pubsub/trait.rs b/src/pubsub/trait.rs new file mode 100644 index 0000000..8d35399 --- /dev/null +++ b/src/pubsub/trait.rs @@ -0,0 +1,117 @@ +use crate::pubsub::{ + shared::{ConnectionManager, ListenerTask}, + ServerShutdown, +}; +use bytes::Bytes; +use serde_json::value::RawValue; +use std::future::Future; +use tokio::sync::watch; +use tokio_stream::Stream; + +/// Convenience alias for naming stream halves. +pub type Out = ::RespSink; + +/// Convenience alias for naming stream halves. +pub type In = ::ReqStream; + +/// Configuration objects for connecting a [`Listener`]. +pub trait Connect: Send + Sync + Sized { + /// The listener type produced by the connect object. + type Listener: Listener; + + /// The error type for instantiating a [`Listener`]. + type Error: core::error::Error + 'static; + + /// Create the listener + fn make_listener(self) -> impl Future> + Send; + + /// Configure the instruction buffer size for each task spawned by the + /// listener. This buffer will be allocated for EACH connection, and + /// represents the backpressure limit for notifications and responses + /// sent to the client. + fn instruction_buffer(&self) -> usize { + crate::pubsub::shared::DEFAULT_INSTRUCTION_BUFFER_PER_TASK + } + + /// Instantiate and run a task to accept connections, returning a shutdown + /// signal. + /// + /// We do not recommend overriding this method. Doing so will opt out of + /// the library's pubsub task system. Users overriding this method must + /// manually handle connection tasks. + fn run( + self, + router: crate::Router<()>, + ) -> impl Future> + Send { + async move { + let (tx, rx) = watch::channel(()); + ListenerTask { + listener: self.make_listener().await?, + manager: ConnectionManager { + shutdown: rx, + next_id: 0, + router, + instruction_buffer_per_task: + crate::pubsub::shared::DEFAULT_INSTRUCTION_BUFFER_PER_TASK, + }, + } + .spawn(); + Ok(tx.into()) + } + } +} + +/// A [`Listener`] accepts incoming connections and produces [`JsonSink`] and +/// [`JsonReqStream`] objects. +/// +/// Typically this is done by producing a combined object with a [`Stream`] and +/// a [`Sink`], then using [`StreamExt::split`], however this trait allows for +/// more complex implementations. +/// +/// It is expected that the stream or sink may have stream adapters that wrap +/// the underlying transport objects. +/// +/// [`ReadJsonStream`]: https://docs.rs/alloy-transport-ipc/latest/alloy_transport_ipc/struct.ReadJsonStream.html +/// [`Sink`]: futures_util::sink::Sink +/// [`StreamExt::split`]: futures_util::stream::StreamExt::split +pub trait Listener: Send + 'static { + /// The sink type produced by the listener. + type RespSink: JsonSink; + /// The stream type produced by the listener. + type ReqStream: JsonReqStream; + /// The error type for the listener. + type Error: core::error::Error; + + /// Accept an inbound connection, and split it into a sink and stream. + fn accept( + &self, + ) -> impl Future> + Send; +} + +/// A sink that accepts JSON +pub trait JsonSink: Send + 'static { + /// Error type for the sink. + type Error: core::error::Error + 'static; + + /// Send json to the sink. + fn send_json( + &mut self, + json: Box, + ) -> impl Future> + Send; +} + +impl JsonSink for tokio::sync::mpsc::Sender> { + type Error = tokio::sync::mpsc::error::SendError>; + + fn send_json( + &mut self, + json: Box, + ) -> impl Future> + Send { + self.send(json) + } +} + +/// A stream of inbound [`Bytes`] objects. +pub trait JsonReqStream: Stream + Send + Unpin + 'static {} + +impl JsonReqStream for T where T: Stream + Send + Unpin + 'static {} diff --git a/src/pubsub/ws.rs b/src/pubsub/ws.rs new file mode 100644 index 0000000..3b37e5d --- /dev/null +++ b/src/pubsub/ws.rs @@ -0,0 +1,117 @@ +use bytes::Bytes; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, Stream, StreamExt, +}; +use serde_json::value::RawValue; +use std::{ + future::Future, + net::SocketAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream}; +use tracing::{debug, debug_span, Instrument}; + +/// Sending half of a [`WebSocketStream`] +pub(crate) type SendHalf = SplitSink, Message>; + +/// Receiving half of a [`WebSocketStream`]. +pub(crate) type RecvHalf = SplitStream>; + +/// Simple stream adapter for extracting text from a [`WebSocketStream`]. +#[derive(Debug)] +pub struct WsJsonStream { + inner: RecvHalf, + complete: bool, +} + +impl From for WsJsonStream { + fn from(inner: RecvHalf) -> Self { + Self { + inner, + complete: false, + } + } +} + +impl WsJsonStream { + /// Handle an incoming [`Message`] + fn handle(&self, message: Message) -> Result, &'static str> { + match message { + Message::Text(text) => Ok(Some(text.into())), + Message::Close(Some(frame)) => { + let s = "Received close frame with data"; + debug!(reason = %frame, "{}", &s); + Err(s) + } + Message::Close(None) => { + let s = "WS client has gone away"; + debug!("{}", &s); + Err(s) + } + _ => Ok(None), + } + } +} + +impl Stream for WsJsonStream { + type Item = Bytes; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.complete { + return Poll::Ready(None); + } + + let Some(Ok(msg)) = ready!(self.inner.poll_next_unpin(cx)) else { + self.complete = true; + return Poll::Ready(None); + }; + + match self.handle(msg) { + Ok(Some(item)) => return Poll::Ready(Some(item)), + Ok(None) => continue, + Err(_) => self.complete = true, + } + } + } +} + +impl crate::pubsub::JsonSink for SendHalf { + type Error = tokio_tungstenite::tungstenite::Error; + + async fn send_json(&mut self, json: Box) -> Result<(), Self::Error> { + self.send(Message::text(json.get())).await + } +} + +impl crate::pubsub::Listener for TcpListener { + type RespSink = SendHalf; + + type ReqStream = WsJsonStream; + + type Error = tokio_tungstenite::tungstenite::Error; + + async fn accept(&self) -> Result<(Self::RespSink, Self::ReqStream), Self::Error> { + let (stream, socket_addr) = self.accept().await?; + + let span = debug_span!("ws connection", remote_addr = %socket_addr); + + let ws_stream = accept_async(stream).instrument(span).await?; + + let (send, recv) = ws_stream.split(); + + Ok((send, recv.into())) + } +} + +impl crate::pubsub::Connect for SocketAddr { + type Listener = TcpListener; + type Error = std::io::Error; + + fn make_listener(self) -> impl Future> + Send { + TcpListener::bind(self) + } +} diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 0000000..2032b5d --- /dev/null +++ b/src/router.rs @@ -0,0 +1,559 @@ +//! JSON-RPC router. + +use crate::{ + routes::{MakeErasedHandler, RouteFuture}, + BoxedIntoRoute, ErasedIntoRoute, Handler, HandlerArgs, Method, MethodId, RegistrationError, + Route, +}; +use core::fmt; +use serde_json::value::RawValue; +use std::{borrow::Cow, collections::BTreeMap, convert::Infallible, sync::Arc, task::Poll}; +use tower::Service; +use tracing::{debug_span, trace}; + +/// A JSON-RPC router. This is the top-level type for handling JSON-RPC +/// requests. It is heavily inspired by the [`axum::Router`] type. +/// +/// A router manages a collection of "methods" that can be called by clients. +/// Each method is associated with a handler that processes the request and +/// returns a response. The router is responsible for routing requests to the +/// appropriate handler. +/// +/// Methods can be added to the router using the [`Router::route`] family of +/// methods: +/// +/// - [`Router::route`]: Add a method with a [`Handler`]. The [`Handler`] will +/// be invoked with the request parameters and provided state. +/// +/// ## Basic Example +/// +/// Routers are constructed via builder-style methods. The following example +/// demonstrates how to create a router with two methods, `double` and `add`. +/// The `double` method doubles the provided number, and the `add` method adds +/// the provided number to some state, and returns the sum. +/// +/// These methods can +/// +/// ``` +/// use ajj::{Router}; +/// +/// # fn main() { +/// // Provide methods called "double" and "add" to the router. +/// let router = Router::::new() +/// .route("double", |params: u64| async move { +/// Ok::<_, ()>(params * 2) +/// }) +/// .route("add", |params: u64, state: u64| async move { +/// Ok::<_, ()>(params + state) +/// }) +/// // The router is provided with state, and is now ready to serve requests. +/// .with_state::<()>(3u64); +/// # } +/// ``` +/// +/// +/// ## Note +/// +/// The state `S` is "missing" state. It is state that must be added to the +/// router (and therefore to the methods) before it can be used. To add state, +/// use the [`Router::with_state`] method. See that method for more +/// information. +/// +/// Analagous to [`axum::Router`]. +/// +/// [`axum::Router`]: https://docs.rs/axum/latest/axum/struct.Router.html +#[must_use = "Routers do nothing unless served."] +pub struct Router { + inner: Arc>, +} + +impl Clone for Router { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl Default for Router +where + S: Send + Sync + Clone + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +impl Router +where + S: Send + Sync + Clone + 'static, +{ + /// Create a new, empty router. + pub fn new() -> Self { + Self { + inner: Arc::new(RouterInner::new()), + } + } + + /// If this router is the only reference to its inner state, return the + /// inner state. Otherwise, clone the inner state and return the clone. + fn into_inner(self) -> RouterInner { + match Arc::try_unwrap(self.inner) { + Ok(inner) => inner, + Err(arc) => RouterInner { + routes: arc.routes.clone(), + last_id: arc.last_id, + fallback: arc.fallback.clone(), + name_to_id: arc.name_to_id.clone(), + id_to_name: arc.id_to_name.clone(), + }, + } + } + + /// Add state to the router, readying methods that require that state. + /// + /// Note that the type parameter `S2` is NOT the state you are adding to the + /// router. It is additional state that must be added AFTER the state `S`. + pub fn with_state(self, state: S) -> Router { + map_inner!(self, inner => inner.with_state(&state)) + } + + /// Add a fallback [`Route`]. This route will be called when no method + /// names match the request. + /// + /// ## Note + /// + /// If unset, a default fallback route will be used that returns the error + /// generated by [`ResponsePayload::method_not_found`]. + /// + /// [`ResponsePayload::method_not_found`]: crate::ResponsePayload::method_not_found + pub(crate) fn fallback_stateless(self, route: Route) -> Self { + tap_inner!(self, mut this => { + this.fallback = Method::Ready(route); + }) + } + + /// Add a fallback [`ErasedIntoRoute`] handler. The handler will be cloned + /// and boxed. This handler will be called when no method names match the + /// request. + /// + /// ## Notes + /// + /// The `S` type parameter is "missing" state. It is state that must be + /// added to the router (and therefore to the methods) before it can be + /// used. To add state, use the [`Router::with_state`] method. See that + /// method for more information. + /// + /// If unset, a default fallback route will be used that returns the error + /// generated by [`ResponsePayload::method_not_found`]. + /// + /// [`ResponsePayload::method_not_found`]: crate::ResponsePayload::method_not_found + fn fallback_erased(self, handler: E) -> Self + where + E: ErasedIntoRoute, + { + tap_inner!(self, mut this => { + this.fallback = Method::Needs(BoxedIntoRoute(handler.clone_box())); + }) + } + + /// Add a fallback [`Handler`] to the router. This handler will be called + /// when no method names match the request. + /// + /// ## Notes + /// + /// The `S` type parameter is "missing" state. It is state that must be + /// added to the router (and therefore to the methods) before it can be + /// used. To add state, use the [`Router::with_state`] method. See that + /// method for more information. + /// + /// If unset, a default fallback route will be used that returns the error + /// generated by [`ResponsePayload::method_not_found`]. + /// + /// [`ResponsePayload::method_not_found`]: crate::ResponsePayload::method_not_found + pub fn fallback(self, handler: H) -> Self + where + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + self.fallback_erased(MakeErasedHandler::from_handler(handler)) + } + + /// Add a fallback [`Service`] handler. This handler will be called when no + /// method names match the request. + /// + /// ## Note + /// + /// If unset, a default fallback route will be used that returns the error + /// generated by [`ResponsePayload::method_not_found`]. + /// + /// [`ResponsePayload::method_not_found`]: crate::ResponsePayload::method_not_found + pub fn fallback_service(self, service: T) -> Self + where + T: Service< + HandlerArgs, + Response = Box, + Error = Infallible, + Future: Send + 'static, + > + Clone + + Send + + Sync + + 'static, + { + self.fallback_stateless(Route::new(service)) + } + + /// Add a method with a [`Handler`] to the router. + /// + /// ## Note + /// + /// The `S` type parameter is "missing" state. It is state that must be + /// added to the router (and therefore to the methods) before it can be + /// used. To add state, use the [`Router::with_state`] method. See that + /// method for more information. + /// + /// # Panics + /// + /// Panics if the method name already exists in the router. + pub fn route(self, method: impl Into>, handler: H) -> Self + where + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + tap_inner!(self, mut this => { + this = this.route(method, handler); + }) + } + + /// Nest a router under a prefix. This is useful for grouping related + /// methods together, or for namespacing logical groups of methods. + /// + /// The prefix is trimmed of any trailing underscores, and a single + /// underscore is added between the prefix and the method name. + /// + /// I.e. the following are equivalent and will all produce the `foo_` + /// prefix for the methods in `other`: + /// - `router.nest("foo", other)` + /// - `router.nest("foo_", other)` + /// - `router.nest("foo__", other)` + /// + /// # Panics + /// + /// Panics if collision occurs between the prefixed methods from `other` and + /// the methods in `self`. E.g. if the prefix is `foo`, `self` has a + /// method `foo_bar`, and `other` has a method `bar`. + pub fn nest(self, prefix: impl Into>, other: Self) -> Self { + let prefix = prefix.into(); + + let mut this = self.into_inner(); + let prefix = Cow::Borrowed(prefix.trim_end_matches('_')); + + let RouterInner { + routes, id_to_name, .. + } = other.into_inner(); + + for (id, handler) in routes.into_iter() { + let existing_name = id_to_name + .get(&id) + .expect("nested router has missing name for existing method"); + let method = format!("{}_{}", prefix, existing_name); + panic_on_err!(this.enroll_method(method.into(), handler)); + } + + Self { + inner: Arc::new(this), + } + } + + /// Merge two routers together. This enrolls all methods from `other` into + /// `self`. + /// + /// # Panics + /// + /// Panics if a method name collision occurs between the two routers. I.e. + /// if any method from `other` has the same name as a method in `self`. + pub fn merge(self, other: Self) -> Self { + let mut this = self.into_inner(); + let RouterInner { + routes, + mut id_to_name, + .. + } = other.into_inner(); + + for (id, handler) in routes.into_iter() { + let existing_name = id_to_name + .remove(&id) + .expect("nested router has missing name for existing method"); + panic_on_err!(this.enroll_method(existing_name, handler)); + } + + Self { + inner: Arc::new(this), + } + } + + /// Call a method on the router, by providing the necessary state. + /// + /// This is a convenience method, primarily for testing. Use in production + /// code is discouraged. Routers should not be left in incomplete states. + pub fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture { + let id = args.req.id_owned(); + let method = args.req.method(); + + let span = debug_span!("Router::call_with_state", %method, %id); + trace!(params = args.req.params()); + + self.inner.call_with_state(args, state).with_span(span) + } + + /// Nest this router into a new Axum router, with the specified path. + #[cfg(feature = "axum")] + pub fn into_axum(self, path: &str) -> axum::Router { + axum::Router::new().route(path, axum::routing::post(self)) + } +} + +impl Router<()> { + // /// Serve the router over a connection. This method returns a + // /// [`ServerShutdown`], which will shut down the server when dropped. + // /// + // /// [`ServerShutdown`]: crate::pubsub::ServerShutdown + // #[cfg(feature = "pubsub")] + // pub async fn serve_pubsub( + // self, + // connect: C, + // ) -> Result { + // connect.run(self).await + // } + + /// Call a method on the router. + pub fn handle_request(&self, args: HandlerArgs) -> RouteFuture { + self.call_with_state(args, ()) + } +} + +impl fmt::Debug for Router { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Router").finish_non_exhaustive() + } +} + +impl tower::Service for Router<()> { + type Response = Box; + type Error = Infallible; + type Future = RouteFuture; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, args: HandlerArgs) -> Self::Future { + self.handle_request(args) + } +} + +impl tower::Service for &Router<()> { + type Response = Box; + type Error = Infallible; + type Future = RouteFuture; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, args: HandlerArgs) -> Self::Future { + self.handle_request(args) + } +} + +/// The inner state of a [`Router`]. Maps methods to their handlers. +pub(crate) struct RouterInner { + /// A map from method IDs to their handlers. + routes: BTreeMap>, + + /// The last ID assigned to a method. + last_id: MethodId, + + /// The handler to call when no method is found. + fallback: Method, + + // next 2 fields are used for reverse lookup of method names + /// A map from method names to their IDs. + name_to_id: BTreeMap, MethodId>, + /// A map from method IDs to their names. + id_to_name: BTreeMap>, +} + +impl Default for RouterInner<()> { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for RouterInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RouterInner").finish_non_exhaustive() + } +} + +impl RouterInner { + /// Create a new, empty router. + pub(crate) fn new() -> Self { + Self { + routes: BTreeMap::new(), + + last_id: Default::default(), + + fallback: Method::Ready(Route::default_fallback()), + + name_to_id: BTreeMap::new(), + id_to_name: BTreeMap::new(), + } + } + + /// Add state to the router, readying methods that require that state. + /// + /// Note that the type parameter `S2` is NOT the state you are adding to the + /// router. It is additional state that must be added AFTER the state `S`. + pub(crate) fn with_state(self, state: &S) -> RouterInner + where + S: Clone, + { + RouterInner { + routes: self + .routes + .into_iter() + .map(|(id, method)| (id, method.with_state(state))) + .collect(), + fallback: self.fallback.with_state(state), + last_id: self.last_id, + name_to_id: self.name_to_id, + id_to_name: self.id_to_name, + } + } + + /// Get the next available ID. + fn get_id(&mut self) -> MethodId { + self.last_id += 1; + self.last_id + } + + /// Get a method by its name. + fn method_by_name(&self, name: &str) -> Option<&Method> { + self.name_to_id.get(name).and_then(|id| self.routes.get(id)) + } + + /// Enroll a method name, returning an ID assignment. Panics if the method + /// name already exists in the router. + #[track_caller] + fn enroll_method_name( + &mut self, + method: Cow<'static, str>, + ) -> Result { + if self.name_to_id.contains_key(&method) { + return Err(RegistrationError::method_already_registered(method)); + } + + let id = self.get_id(); + self.name_to_id.insert(method.clone(), id); + self.id_to_name.insert(id, method.clone()); + Ok(id) + } + + /// Enroll a method name, returning an ID assignment. Panics if the method + /// name already exists in the router. + fn enroll_method( + &mut self, + method: Cow<'static, str>, + handler: Method, + ) -> Result { + self.enroll_method_name(method).inspect(|id| { + self.routes.insert(*id, handler); + }) + } + + /// Add a method to the router. This method may be missing state `S`. + /// + /// # Panics + /// + /// Panics if the method name already exists in the router. + #[track_caller] + fn route_erased(mut self, method: impl Into>, handler: E) -> Self + where + E: ErasedIntoRoute, + { + let method = method.into(); + let handler = handler.clone_box(); + + add_method_inner(&mut self, method, handler); + + fn add_method_inner( + this: &mut RouterInner, + method: Cow<'static, str>, + handler: Box>, + ) { + panic_on_err!(this.enroll_method(method, Method::Needs(BoxedIntoRoute(handler)))); + } + + self + } + + /// Add a [`Handler`] to the router. + /// + /// # Panics + /// + /// Panics if the method name already exists in the router. + pub(crate) fn route(self, method: impl Into>, handler: H) -> Self + where + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + self.route_erased(method, MakeErasedHandler::from_handler(handler)) + } + + /// Call a method on the router, with the provided state. + #[track_caller] + pub(crate) fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture { + let method = args.req.method(); + self.method_by_name(method) + .unwrap_or(&self.fallback) + .call_with_state(args, state) + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/routing/mod.rs#L119 +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/routes/ctx.rs b/src/routes/ctx.rs new file mode 100644 index 0000000..4598d86 --- /dev/null +++ b/src/routes/ctx.rs @@ -0,0 +1,77 @@ +use serde_json::value::RawValue; +use tokio::sync::mpsc; +use tracing::error; + +use crate::RpcSend; + +/// Errors that can occur when sending notifications. +#[derive(thiserror::Error, Debug)] +pub enum NotifyError { + /// An error occurred while serializing the notification. + #[error("failed to serialize notification: {0}")] + Serde(#[from] serde_json::Error), + /// The notification channel was closed. + #[error("notification channel closed")] + Send(#[from] mpsc::error::SendError>), +} + +/// A context for handler requests that allow the handler to send notifications +/// from long-running tasks (e.g. subscriptions). +/// +/// This is primarily intended to enable subscriptions over pubsub transports +/// to send notifications to clients. It is expected that JSON sent via the +/// notification channel is a valid JSON-RPC 2.0 object. +#[derive(Debug, Clone, Default)] +pub struct HandlerCtx { + pub(crate) notifications: Option>>, +} + +impl From>> for HandlerCtx { + fn from(notifications: mpsc::Sender>) -> Self { + Self { + notifications: Some(notifications), + } + } +} + +impl HandlerCtx { + /// Instantiate a new handler context. + pub const fn new() -> Self { + Self { + notifications: None, + } + } + + /// Instantiation a new handler context with notifications enabled. + pub const fn with_notifications(notifications: mpsc::Sender>) -> Self { + Self { + notifications: Some(notifications), + } + } + + /// Get a reference to the notification sender. This is used to + /// send notifications over pubsub transports. + pub const fn notifications(&self) -> Option<&mpsc::Sender>> { + self.notifications.as_ref() + } + + /// Notify a client of an event. + pub async fn notify(&self, t: &T) -> Result<(), NotifyError> { + if let Some(notifications) = self.notifications.as_ref() { + let ser = serde_json::to_string(t)?; + let rv = serde_json::value::to_raw_value(&ser)?; + notifications.send(rv).await?; + } + + Ok(()) + } +} + +/// Arguments passed to a handler. +#[derive(Debug, Clone)] +pub struct HandlerArgs { + /// The handler context. + pub ctx: HandlerCtx, + /// The JSON-RPC request. + pub req: crate::types::Request, +} diff --git a/src/routes/erased.rs b/src/routes/erased.rs new file mode 100644 index 0000000..02e5548 --- /dev/null +++ b/src/routes/erased.rs @@ -0,0 +1,153 @@ +use crate::{routes::HandlerArgs, Handler, Route}; +use tower::Service; + +use super::HandlerInternal; + +/// A boxed, erased type that can be converted into a [`Route`]. Similar to +/// axum's [`ErasedIntoRoute`] +/// +/// Currently this is a placeholder to enable future convenience functions +/// +/// [`ErasedIntoRoute`]: https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/boxed.rs#L61-L68 +pub(crate) trait ErasedIntoRoute: Send + Sync { + /// Take a reference to this type, clone it, box it, and type erase it. + /// + /// This allows it to be stored in a collection of `dyn + /// ErasedIntoRoute`. + fn clone_box(&self) -> Box>; + + /// Convert this type into a handler. + fn into_route(self: Box, state: S) -> Route; + + /// Call this handler with the given state. + #[allow(dead_code)] + fn call_with_state( + self: Box, + args: HandlerArgs, + state: S, + ) -> >::Future; +} + +/// A boxed, erased type that can be converted into a [`Route`]. It is a +/// wrapper around a dyn [`ErasedIntoRoute`]. +/// +/// Similar to axum's [`BoxedIntoRoute`] +/// +/// [`BoxedIntoRoute`]: https://github.com/tokio-rs/axum/blob/18a99da0b0baf9eeef326b34525826ae0b5a1370/axum/src/boxed.rs#L12 +pub(crate) struct BoxedIntoRoute(pub(crate) Box>); + +#[allow(dead_code)] +impl BoxedIntoRoute { + /// Convert this into a [`Route`] with the given state. + pub(crate) fn into_route(self, state: S) -> Route { + self.0.into_route(state) + } +} + +impl Clone for BoxedIntoRoute { + fn clone(&self) -> Self { + Self(self.0.clone_box()) + } +} + +#[allow(dead_code)] +impl BoxedIntoRoute { + pub(crate) fn from_handler(handler: H) -> Self + where + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + Self(Box::new(MakeErasedHandler::from_handler(handler))) + } +} + +/// Adapter to convert [`Handler`]s into [`ErasedIntoRoute`]s. +pub(crate) struct MakeErasedHandler { + pub(crate) handler: H, + pub(crate) into_route: fn(H, S) -> Route, +} + +impl Clone for MakeErasedHandler +where + H: Clone, +{ + fn clone(&self) -> Self { + Self { + handler: self.handler.clone(), + into_route: self.into_route, + } + } +} + +impl MakeErasedHandler { + /// Create a new [`MakeErasedHandler`] with the given handler and conversion + /// function. + pub(crate) fn from_handler(handler: H) -> Self + where + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + MakeErasedHandler { + handler, + into_route: |handler, state| HandlerInternal::into_route(handler, state), + } + } +} + +impl ErasedIntoRoute for MakeErasedHandler +where + H: Clone + Send + Sync + 'static, + S: 'static, +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } + + fn into_route(self: Box, state: S) -> Route { + (self.into_route)(self.handler, state) + } + + fn call_with_state( + self: Box, + args: HandlerArgs, + state: S, + ) -> >::Future { + self.into_route(state).call(args) + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/routing/mod.rs#L119 +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/routes/future.rs b/src/routes/future.rs new file mode 100644 index 0000000..369f193 --- /dev/null +++ b/src/routes/future.rs @@ -0,0 +1,106 @@ +use crate::routes::HandlerArgs; +use core::fmt; +use pin_project::pin_project; +use serde_json::value::RawValue; +use std::{ + convert::Infallible, + future::Future, + task::{Context, Poll}, +}; +use tower::util::{BoxCloneSyncService, Oneshot}; + +/// A future produced by +/// +/// [`Route`]: crate::routes::Route +#[pin_project] +pub struct RouteFuture { + /// The inner [`Route`] future. + /// + /// [`Route`]: crate::routes::Route + #[pin] + inner: Oneshot, Infallible>, HandlerArgs>, + /// The span (if any). + span: Option, +} + +impl RouteFuture { + /// Create a new route future. + pub const fn new( + inner: Oneshot, Infallible>, HandlerArgs>, + ) -> Self { + Self { inner, span: None } + } + + /// Create a new method future with a span, this behaves as an + /// [`Instrumented`], with the span entered when the future is polled. + /// + /// [`Instrumented`]: tracing::instrument::Instrumented + pub const fn new_with_span( + inner: Oneshot, Infallible>, HandlerArgs>, + span: tracing::Span, + ) -> Self { + Self { + inner, + span: Some(span), + } + } + + /// Set the span for the future. + pub fn with_span(self, span: tracing::Span) -> Self { + Self { + span: Some(span), + ..self + } + } +} + +impl fmt::Debug for RouteFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RouteFuture").finish_non_exhaustive() + } +} + +impl Future for RouteFuture { + type Output = Result, Infallible>; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let _enter = this.span.as_ref().map(tracing::Span::enter); + + this.inner.poll(cx) + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/ +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/routes/handler.rs b/src/routes/handler.rs new file mode 100644 index 0000000..b5f7375 --- /dev/null +++ b/src/routes/handler.rs @@ -0,0 +1,676 @@ +use crate::{ + routes::HandlerArgs, types::Response, HandlerCtx, ResponsePayload, Route, RpcRecv, RpcSend, +}; +use serde_json::value::RawValue; +use std::{convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task}; + +macro_rules! convert_result { + ($res:expr) => {{ + match $res { + Ok(val) => ResponsePayload::Success(val), + Err(err) => ResponsePayload::internal_error_with_obj(err), + } + }}; +} + +/// A trait describing handlers for JSON-RPC methods. +/// +/// Handlers map some input type `T` to a future that resolve to a +/// [`ResponsePayload`]. The handler may also require some state `S` to operate. +/// +/// ### Returning error messages +/// +/// Note that when a [`Handler`] returns a `Result`, the error type will be in +/// the [`ResponsePayload`]'s `data` field, and the message and code will +/// indicate an internal server error. To return a response with an error code +/// and custom `message` field, use a handler that returns an instance of the +/// [`ResponsePayload`] enum, and instantiate that error payload manually. +/// +/// ``` +/// # use ajj::ResponsePayload; +/// let handler_a = || async { Err::<(), _>("appears in \"data\"") }; +/// let handler_b = || async { +/// ResponsePayload::<(), ()>::internal_error_message("appears in \"message\"".into()) +/// }; +/// ``` +/// +/// ### Handler return type inference +/// +/// Handlers that always suceed or always fail may have trouble with type +/// inference, as they contain an unknown type parameter, which could be +/// anything. Here's an example of code with failed type inference: +/// +/// ```compile_fail +/// # use ajj::ResponsePayload; +/// // cannot infer type of the type parameter `T` declared on the enum `Result` +/// let cant_infer_ok = || async { Err(1) }; +/// +/// // cannot infer type of the type parameter `E` declared on the enum `Result` +/// let cant_infer_err = || async { Ok(2) }; +/// +/// // cannot infer type of the type parameter `ErrData` declared on the enum `ResponsePayload` +/// let cant_infer_failure = || async { ResponsePayload::Success(3) }; +/// +/// // cannot infer type of the type parameter `ErrData` declared on the enum `ResponsePayload` +/// let cant_infer_success = || async { ResponsePayload::internal_error_with_obj(4) }; +/// ``` +/// +/// If you encounter these sorts of inference errors, you can add turbofish to +/// your handlers' return values like so: +/// +/// ``` +/// # use ajj::ResponsePayload; +/// // specify the Err on your Ok +/// let handler_a = || async { Ok::<_, ()>(1) }; +/// +/// // specify the Ok on your Err +/// let handler_b = || async { Err::<(), _>(2) }; +/// +/// // specify the ErrData on your Success +/// let handler_c = || async { ResponsePayload::Success::<_, ()>(3) }; +/// +/// // specify the Payload on your Failure +/// let handler_d = || async { ResponsePayload::<(), _>::internal_error_with_obj(4) }; +/// ``` +/// +/// ## Note on `S` +/// +/// The `S` type parameter is "missing" state. It represents the state that the +/// handler needs to operate. This state is passed to the handler when calling +/// [`Handler::call_with_state`]. +/// +/// ## Blanket Implementations +/// +/// This trait is blanket implemented for the following function and closure +/// types, where `Fut` is a [`Future`] returning either [`ResponsePayload`] or +/// [`Result`]: +/// +/// - async fn() +/// - async fn(Params) -> Fut +/// - async fn(HandlerCtx, Params) -> Fut +/// - async fn(Params, S) -> Fut +/// - async fn(HandlerCtx, Params) -> Fut +/// - async fn(HandlerCtx, Params, S) -> Fut +/// +/// ### Implementer's note: +/// +/// The generics on the implementation of the `Handler` trait are actually very +/// straightforward, even though they look intimidating. +/// +/// The `T` type parameter is a **marker** and never needs be constructed. +/// It exists to differentiate the output impls, so that the trait can be +/// blanket implemented for many different function types. If it were simply +/// `Handler`, there could only be 1 blanket impl. +/// +/// However the `T` type must constrain the relevant input types. When +/// implementing `Handler` for your own type, it is recommended to do the +/// following: +/// +/// ``` +/// use ajj::{Handler, HandlerArgs, ResponsePayload, RawValue}; +/// use std::{pin::Pin, future::Future}; +/// +/// /// A marker type to differentiate this handler from other blanket impls. +/// pub struct MyMarker { +/// _sealed: (), +/// } +/// +/// #[derive(Clone)] +/// pub struct MyHandler; +/// +/// // the `T` type parameter should be a tuple, containing your marker type, +/// // and the components that your handler actually uses. This ensures that +/// // the implementations are always unambiguous. +/// // +/// // e.g. +/// // - (MyMarker, ) +/// // - (MyMarker, HandlerArgs) +/// // - (MyMarker, HandlerArgs, Params) +/// // etc. +/// +/// impl Handler<(MyMarker, ), S> for MyHandler { +/// type Future = Pin> + Send>>; +/// +/// fn call_with_state(self, _args: HandlerArgs, _state: S) -> Self::Future { +/// todo!("use nothing but your struct") +/// } +/// } +/// +/// impl Handler<(MyMarker, HandlerArgs), S> for MyHandler { +/// type Future = Pin> + Send>>; +/// +/// fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { +/// todo!("use the args") +/// } +/// } +/// ``` +/// +/// +pub trait Handler: Clone + Send + Sync + Sized + 'static { + /// The future returned by the handler. + type Future: Future> + Send + 'static; + + /// Call the handler with the given request and state. + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future; +} + +/// Extension trait for [`Handler`]s. +pub(crate) trait HandlerInternal: Handler { + /// Create a new handler that wraps this handler and has some state. + fn with_state(self, state: S) -> HandlerService { + HandlerService::new(self, state) + } + + /// Convert the handler into a [`Route`], ready for internal registration + /// on a [`Router`]. + /// + /// [`Router`]: crate::Router + #[allow(private_interfaces)] + fn into_route(self, state: S) -> Route + where + T: Send + 'static, + S: Clone + Send + Sync + 'static, + { + Route::new(self.with_state(state)) + } +} + +impl HandlerInternal for H where H: Handler {} + +/// A [`Handler`] with some state `S` and params type `T`. +#[derive(Debug)] +pub(crate) struct HandlerService { + handler: H, + state: S, + _marker: std::marker::PhantomData T>, +} + +impl Clone for HandlerService +where + H: Clone, + S: Clone, +{ + fn clone(&self) -> Self { + Self { + handler: self.handler.clone(), + state: self.state.clone(), + _marker: PhantomData, + } + } +} + +impl HandlerService { + /// Create a new handler service. + pub(crate) const fn new(handler: H, state: S) -> Self { + Self { + handler, + state, + _marker: PhantomData, + } + } +} + +impl tower::Service for HandlerService +where + Self: Clone, + H: Handler, + T: Send + 'static, + S: Clone + Send + Sync + 'static, +{ + type Response = Box; + type Error = Infallible; + type Future = Pin, Infallible>> + Send>>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, args: HandlerArgs) -> Self::Future { + let this = self.clone(); + Box::pin(async move { Ok(this.handler.call_with_state(args, this.state.clone()).await) }) + } +} + +/// A marker type for handlers that return a [`Result`]. +/// +/// This type should never be constructed, and importing it is almost certainly +/// a mistake. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OutputResult { + _sealed: (), +} + +/// A marker type for handlers that return a [`ResponsePayload`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OutputResponsePayload { + _sealed: (), +} + +impl Handler<(OutputResponsePayload,), S> for F +where + F: FnOnce() -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + let id = args.req.id_owned(); + + Box::pin(async move { + let payload = self().await; + + Response { + id, + payload: &payload, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, HandlerCtx), S> for F +where + F: FnOnce(HandlerCtx) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + let id = args.req.id_owned(); + let ctx = args.ctx; + + Box::pin(async move { + let payload = self(ctx).await; + Response { + id, + payload: &payload, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, Params), S> for F +where + F: FnOnce(Params) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + let HandlerArgs { req, .. } = args; + Box::pin(async move { + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + let payload = self(params).await; + + Response { + id, + payload: &payload, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, HandlerCtx, Params), S> + for F +where + F: FnOnce(HandlerCtx, Params) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + let payload = self(ctx, params).await; + Response { + id, + payload: &payload, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, Params, S), S> for F +where + F: FnOnce(Params, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { req, .. } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &self(params, state).await, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, S, HandlerCtx), S> for F +where + F: FnOnce(HandlerCtx, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &self(ctx, state).await, + } + .to_json() + }) + } +} + +impl Handler<(OutputResponsePayload, HandlerCtx, Params, S), S> + for F +where + F: FnOnce(HandlerCtx, Params, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &self(ctx, params, state).await, + } + .to_json() + }) + } +} + +impl Handler<(OutputResult,), S> for F +where + F: FnOnce() -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + let id = args.req.id_owned(); + drop(args); + Box::pin(async move { + Response { + id, + payload: &convert_result!(self().await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, HandlerCtx), S> for F +where + F: FnOnce(HandlerCtx) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + + drop(req); + + Box::pin(async move { + Response { + id, + payload: &convert_result!(self(ctx).await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, Params), S> for F +where + F: FnOnce(Params) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { req, .. } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &convert_result!(self(params).await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, HandlerCtx, Params), S> for F +where + F: FnOnce(HandlerCtx, Params) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, _state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &convert_result!(self(ctx, params).await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, Params, S), S> for F +where + F: FnOnce(Params, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { req, .. } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &convert_result!(self(params, state).await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, S, HandlerCtx), S> for F +where + F: FnOnce(HandlerCtx, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + + drop(req); + + Box::pin(async move { + Response { + id, + payload: &convert_result!(self(ctx, state).await), + } + .to_json() + }) + } +} + +impl Handler<(OutputResult, HandlerCtx, Params, S), S> for F +where + F: FnOnce(HandlerCtx, Params, S) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Params: RpcRecv, + Payload: RpcSend, + ErrData: RpcSend, + S: Send + Sync + 'static, +{ + type Future = Pin> + Send>>; + + fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future { + Box::pin(async move { + let HandlerArgs { ctx, req } = args; + + let id = req.id_owned(); + let Ok(params) = req.deser_params() else { + return Response::invalid_params(id); + }; + + drop(req); // deallocate explicitly. No funny business. + + Response { + id, + payload: &convert_result!(self(ctx, params, state).await), + } + .to_json() + }) + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/ +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/routes/method.rs b/src/routes/method.rs new file mode 100644 index 0000000..8421829 --- /dev/null +++ b/src/routes/method.rs @@ -0,0 +1,82 @@ +use crate::{routes::RouteFuture, BoxedIntoRoute, HandlerArgs, Route}; + +/// A method, which may be ready to handle requests or may need to be +/// initialized with some state. +/// +/// Analagous to axum's `MethodEndpoint` +pub(crate) enum Method { + /// A method that needs to be initialized with some state. + Needs(BoxedIntoRoute), + /// A method that is ready to handle requests. + Ready(Route), +} + +impl Clone for Method { + fn clone(&self) -> Self { + match self { + Self::Needs(handler) => Self::Needs(handler.clone()), + Self::Ready(route) => Self::Ready(route.clone()), + } + } +} + +impl Method { + /// Call the method with the given state and request. + pub(crate) fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture { + match self { + Self::Ready(route) => route.clone().oneshot_inner_owned(args), + Self::Needs(handler) => handler + .clone() + .0 + .into_route(state) + .oneshot_inner_owned(args), + } + } +} + +impl Method +where + S: Clone, +{ + /// Add state to a method, converting + pub(crate) fn with_state(self, state: &S) -> Method { + match self { + Self::Ready(route) => Method::Ready(route), + Self::Needs(handler) => Method::Ready(handler.0.into_route(state.clone())), + } + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/blob/f84105ae8b078109987b089c47febc3b544e6b80/axum/src/routing/mod.rs#L119 +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/routes/mod.rs b/src/routes/mod.rs new file mode 100644 index 0000000..e7f7d83 --- /dev/null +++ b/src/routes/mod.rs @@ -0,0 +1,132 @@ +mod ctx; +pub use ctx::{HandlerArgs, HandlerCtx, NotifyError}; + +mod erased; +pub(crate) use erased::{BoxedIntoRoute, ErasedIntoRoute, MakeErasedHandler}; + +mod future; +pub use future::RouteFuture; + +mod handler; +pub use handler::Handler; +pub(crate) use handler::HandlerInternal; + +mod method; +pub(crate) use method::Method; + +use serde_json::value::RawValue; +use std::{ + convert::Infallible, + task::{Context, Poll}, +}; +use tower::{util::BoxCloneSyncService, Service, ServiceExt}; + +use crate::types::Response; + +/// A JSON-RPC handler for a specific method. +/// +/// A route is a [`BoxCloneSyncService`] that takes JSON parameters and returns +/// a boxed [`RawValue`]. Routes SHOULD be infallible. I.e. any error +/// that occurs during the handling of a request should be represented as a +/// JSON-RPC error response, rather than having the service return an `Err`. +#[derive(Debug)] +pub(crate) struct Route(tower::util::BoxCloneSyncService, Infallible>); + +impl Route { + /// Create a new route from a service. + pub(crate) fn new(inner: S) -> Self + where + S: Service, Error = Infallible> + + Clone + + Send + + Sync + + 'static, + S::Future: Send + 'static, + { + Self(BoxCloneSyncService::new(inner)) + } + + /// Create a default fallback route that returns a method not found error. + pub(crate) fn default_fallback() -> Self { + Self::new(tower::service_fn(|args: HandlerArgs| async { + let HandlerArgs { req, .. } = args; + let id = req.id_owned(); + drop(req); + + Ok(Response::method_not_found(id)) + })) + } + + /// Create a one-shot future for the given request. + pub(crate) fn oneshot_inner(&mut self, args: HandlerArgs) -> RouteFuture { + RouteFuture::new(self.0.clone().oneshot(args)) + } + + /// Variant of [`Route::oneshot_inner`] that takes ownership of the route to avoid cloning. + pub(crate) fn oneshot_inner_owned(self, args: HandlerArgs) -> RouteFuture { + RouteFuture::new(self.0.oneshot(args)) + } +} + +impl Clone for Route { + #[track_caller] + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl From, Infallible>> for Route { + fn from(inner: BoxCloneSyncService, Infallible>) -> Self { + Self(inner) + } +} + +impl Service for Route { + type Response = Box; + + type Error = Infallible; + + type Future = RouteFuture; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, args: HandlerArgs) -> Self::Future { + self.oneshot_inner(args) + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `axum` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/tokio-rs/axum/ +// +// The MIT License (MIT) +// +// Copyright (c) 2019 Axum Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/src/types/error.rs b/src/types/error.rs new file mode 100644 index 0000000..1e31c8a --- /dev/null +++ b/src/types/error.rs @@ -0,0 +1,31 @@ +/// Error when deserializing a request +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + /// Invalid UTF-8 + #[error("Invalid UTF-8: {0}")] + InvalidUtf8(#[from] core::str::Utf8Error), + /// Invalid JSON + #[error("Invalid JSON: {0}")] + InvalidJson(#[from] serde_json::Error), + + /// Id is too large + /// + /// The limit is 80 bytes. 80 is selected as a reasonable limit for + /// most use-cases, and will hold UUIDs as well as 0x-prefixed 256-bit + /// hashes encoded as hex. If you need to send a large id, consider + /// not doing that. + #[error("Id is too large, limit of 80 bytes. Got: {0}")] + IdTooLarge(usize), + + /// Method is not a valid JSON string. + #[error("Method is not a valid JSON string.")] + InvalidMethod, + + /// Method is too large + /// + /// The limit is 80 bytes. 80 is selected as a reasonable limit for + /// most use-cases. If you need to send a large method name, consider + /// not doing that. + #[error("Method is too large, limit of 80 bytes. Got: {0}")] + MethodTooLarge(usize), +} diff --git a/src/types/mod.rs b/src/types/mod.rs new file mode 100644 index 0000000..70d581e --- /dev/null +++ b/src/types/mod.rs @@ -0,0 +1,14 @@ +//! Core types, like [`Request`] and [`Response`]. + +mod req; +pub(crate) use req::Request; + +mod resp; +pub(crate) use resp::Response; +pub use resp::{ErrorPayload, ResponsePayload}; + +mod error; +pub(crate) use error::RequestError; + +pub(crate) const ID_LEN_LIMIT: usize = 80; +pub(crate) const METHOD_LEN_LIMIT: usize = 80; diff --git a/src/types/req.rs b/src/types/req.rs new file mode 100644 index 0000000..5f53c89 --- /dev/null +++ b/src/types/req.rs @@ -0,0 +1,205 @@ +use crate::types::{RequestError, ID_LEN_LIMIT, METHOD_LEN_LIMIT}; +use bytes::Bytes; +use serde_json::value::RawValue; +use std::ops::Range; + +macro_rules! find_range { + ($bytes:expr, $rv:expr) => {{ + let rv = $rv.as_bytes(); + + let start = rv.as_ptr() as usize - $bytes.as_ptr() as usize; + let end = start + rv.len(); + + debug_assert_eq!(rv, &$bytes[start..end]); + + start..end + }}; +} + +/// Utf8 payload, partially deserialized +#[derive(Clone)] +pub struct Request { + /// The underlying byte buffer. This is guaranteed to be a validly + /// formatted JSON string. + bytes: Bytes, + + /// A range of the `bytes` field that represents the id field of the + /// JSON-RPC request. + /// + /// This is guaranteed to be an accessible, valid, portion of the `bytes` + /// property, containing validly-formatted JSON. + /// + /// This field is generated by deserializing to a [`RawValue`] and then + /// calculating the offset of the backing slice within the `bytes` field. + id: Range, + /// A range of the `bytes` field that represents the method field of the + /// JSON-RPC request. + /// + /// This is guaranteed to be an accessible, valid, portion of the `bytes` + /// property, containing validly-formatted JSON. + /// + /// This field is generated by deserializing to a [`RawValue`] and then + /// calculating the offset of the backing slice within the `bytes` field. + method: Range, + /// A range of the `bytes` field that represents the params field of the + /// JSON-RPC request. + /// + /// This is guaranteed to be an accessible, valid, portion of the `bytes` + /// property, containing validly-formatted JSON. + /// + /// This field is generated by deserializing to a [`RawValue`] and then + /// calculating the offset of the backing slice within the `bytes` field. + params: Range, +} + +impl core::fmt::Debug for Request { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + // SAFETY: both str pointers are guaranteed to slices of the owned + // `bytes` field. + + f.debug_struct("Request") + .field("bytes", &self.bytes.len()) + .field("method", &self.method) + .finish_non_exhaustive() + } +} + +#[derive(serde::Deserialize)] +struct DeserHelper<'a> { + #[serde(borrow)] + id: &'a RawValue, + #[serde(borrow)] + method: &'a RawValue, + #[serde(borrow)] + params: &'a RawValue, +} + +impl TryFrom for Request { + type Error = RequestError; + + fn try_from(bytes: Bytes) -> Result { + let DeserHelper { id, method, params } = serde_json::from_slice(bytes.as_ref())?; + + let id = find_range!(bytes, id.get()); + // Ensure the id is not too long + let id_len = id.end - id.start; + if id_len > ID_LEN_LIMIT { + return Err(RequestError::IdTooLarge(id_len)); + } + + // Ensure method is a string, and not too long, and trim the quotes + // from it + let method = method + .get() + .strip_prefix('"') + .and_then(|s| s.strip_suffix('"')) + .ok_or(RequestError::InvalidMethod)?; + let method = find_range!(bytes, method); + + let method_len = method.end - method.start; + if method_len > METHOD_LEN_LIMIT { + return Err(RequestError::MethodTooLarge(method_len)); + } + + let params = find_range!(bytes, params.get()); + + Ok(Self { + bytes, + id, + method, + params, + }) + } +} + +#[cfg(feature = "ws")] +impl TryFrom for Request { + type Error = RequestError; + + fn try_from(bytes: tokio_tungstenite::tungstenite::Utf8Bytes) -> Result { + Self::try_from(Bytes::from(bytes)) + } +} + +impl Request { + /// Return a reference to the serialized ID field. + pub fn id(&self) -> &str { + // SAFETY: `id` is guaranteed to be valid JSON, + // and a valid slice of `bytes`. + unsafe { core::str::from_utf8_unchecked(self.bytes.get_unchecked(self.id.clone())) } + } + + /// Return an owned version of the serialized ID field. + pub fn id_owned(&self) -> Box { + RawValue::from_string(self.id().to_string()).expect("valid json") + } + + /// Return a reference to the serialized method field. + pub fn method(&self) -> &str { + // SAFETY: `method` is guaranteed to be valid JSON, + // and a valid slice of `bytes`. + unsafe { core::str::from_utf8_unchecked(self.bytes.get_unchecked(self.method.clone())) } + } + + /// Return a reference to the serialized params field. + pub fn params(&self) -> &str { + // SAFETY: `params` is guaranteed to be valid JSON, + // and a valid slice of `bytes`. + unsafe { core::str::from_utf8_unchecked(self.bytes.get_unchecked(self.params.clone())) } + } + + /// Deserialize the params field into a type. + pub fn deser_params<'a: 'de, 'de, T: serde::Deserialize<'de>>( + &'a self, + ) -> serde_json::Result { + serde_json::from_str(self.params()) + } +} + +#[cfg(test)] +mod test { + use crate::types::METHOD_LEN_LIMIT; + + use super::*; + + #[test] + fn test_request() { + let bytes = Bytes::from_static(b"{\"id\":1,\"method\":\"foo\",\"params\":[]}"); + let req = Request::try_from(bytes).unwrap(); + + assert_eq!(req.id(), "1"); + assert_eq!(req.method(), r#"foo"#); + assert_eq!(req.params(), r#"[]"#); + } + + #[test] + fn non_utf8() { + let bytes = Bytes::from_static(b"{\"id\xFF\xFF\":1,\"method\":\"foo\",\"params\":[]}"); + let err = Request::try_from(bytes).unwrap_err(); + + assert!(matches!(err, RequestError::InvalidJson(_))); + assert!(err.to_string().contains("invalid unicode code point")); + } + + #[test] + fn too_large_id() { + let id = "a".repeat(ID_LEN_LIMIT + 1); + let bytes = Bytes::from(format!(r#"{{"id":"{}","method":"foo","params":[]}}"#, id)); + let RequestError::IdTooLarge(size) = Request::try_from(bytes).unwrap_err() else { + panic!("Expected RequestError::IdTooLarge") + }; + + assert_eq!(size, ID_LEN_LIMIT + 3); + } + + #[test] + fn too_large_method() { + let method = "a".repeat(METHOD_LEN_LIMIT + 1); + let bytes = Bytes::from(format!(r#"{{"id":1,"method":"{}","params":[]}}"#, method)); + let RequestError::MethodTooLarge(size) = Request::try_from(bytes).unwrap_err() else { + panic!("Expected RequestError::MethodTooLarge") + }; + + assert_eq!(size, METHOD_LEN_LIMIT + 1); + } +} diff --git a/src/types/resp.rs b/src/types/resp.rs new file mode 100644 index 0000000..0bf1a8f --- /dev/null +++ b/src/types/resp.rs @@ -0,0 +1,418 @@ +use crate::RpcSend; +use serde::{ser::SerializeMap, Serialize, Serializer}; +use serde_json::value::{to_raw_value, RawValue}; +use std::borrow::Cow; +use std::fmt; + +const INTERNAL_ERROR: Cow<'_, str> = Cow::Borrowed("Internal error"); + +/// Response struct. +#[derive(Debug, Clone)] +pub(crate) struct Response<'a, T, E> { + pub(crate) id: Box, + pub(crate) payload: &'a ResponsePayload, +} + +impl Response<'_, (), ()> { + /// Parse error response, used when the request is not valid JSON. + pub(crate) fn parse_error() -> Box { + Response::<(), ()> { + id: Default::default(), // NULL + payload: &ResponsePayload::parse_error(), + } + .to_json() + } + + /// Invalid params response, used when the params field does not + /// deserialize into the expected type. + pub(crate) fn invalid_params(id: Box) -> Box { + Response::<(), ()> { + id, + payload: &ResponsePayload::invalid_params(), + } + .to_json() + } + + /// Method not found response, used in default fallback handler. + pub(crate) fn method_not_found(id: Box) -> Box { + Response::<(), ()> { + id, + payload: &ResponsePayload::method_not_found(), + } + .to_json() + } + + /// Response failed to serialize + pub(crate) fn serialization_failure(id: &RawValue) -> Box { + RawValue::from_string(format!( + r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32700,"message":"response serialization error"}}}}"#, + id.get() + )) + .expect("valid json") + } +} + +impl Response<'_, T, E> +where + T: Serialize, + E: Serialize, +{ + pub(crate) fn to_json(&self) -> Box { + serde_json::value::to_raw_value(self).unwrap_or_else(|err| { + tracing::debug!(%err, id = %self.id, "failed to serialize response"); + Response::serialization_failure(&self.id) + }) + } +} + +impl Serialize for Response<'_, T, E> +where + T: Serialize, + E: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(3))?; + map.serialize_entry("jsonrpc", "2.0")?; + map.serialize_entry("id", &self.id)?; + match &self.payload { + ResponsePayload::Success(result) => { + map.serialize_entry("result", result)?; + } + ResponsePayload::Failure(error) => { + map.serialize_entry("error", error)?; + } + } + map.end() + } +} + +/// A JSON-RPC 2.0 response payload. +/// +/// This enum covers both the success and error cases of a JSON-RPC 2.0 +/// response. It is used to represent the `result` and `error` fields of a +/// response object. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ResponsePayload { + /// A successful response payload. + Success(Payload), + /// An error response payload. + Failure(ErrorPayload), +} + +impl ResponsePayload { + /// Create a new error payload for a parse error. + pub const fn parse_error() -> Self { + Self::Failure(ErrorPayload::parse_error()) + } + + /// Create a new error payload for an invalid request. + pub const fn invalid_request() -> Self { + Self::Failure(ErrorPayload::invalid_request()) + } + + /// Create a new error payload for a method not found error. + pub const fn method_not_found() -> Self { + Self::Failure(ErrorPayload::method_not_found()) + } + + /// Create a new error payload for an invalid params error. + pub const fn invalid_params() -> Self { + Self::Failure(ErrorPayload::invalid_params()) + } + + /// Create a new error payload for an internal error. + pub const fn internal_error() -> Self { + Self::Failure(ErrorPayload::internal_error()) + } + + /// Create a new error payload for an internal error with a custom message. + pub const fn internal_error_message(message: Cow<'static, str>) -> Self { + Self::Failure(ErrorPayload::internal_error_message(message)) + } + + /// Create a new error payload for an internal error with a custom message + /// and additional data. + pub const fn internal_error_with_obj(data: ErrData) -> Self + where + ErrData: RpcSend, + { + Self::Failure(ErrorPayload::internal_error_with_obj(data)) + } + + /// Create a new error payload for an internal error with a custom message + /// and additional data. + pub const fn internal_error_with_message_and_obj( + message: Cow<'static, str>, + data: ErrData, + ) -> Self + where + ErrData: RpcSend, + { + Self::Failure(ErrorPayload::internal_error_with_message_and_obj( + message, data, + )) + } + + /// Fallible conversion to the successful payload. + pub const fn as_success(&self) -> Option<&Payload> { + match self { + Self::Success(payload) => Some(payload), + _ => None, + } + } + + /// Fallible conversion to the error object. + pub const fn as_error(&self) -> Option<&ErrorPayload> { + match self { + Self::Failure(payload) => Some(payload), + _ => None, + } + } + + /// Returns `true` if the response payload is a success. + pub const fn is_success(&self) -> bool { + matches!(self, Self::Success(_)) + } + + /// Returns `true` if the response payload is an error. + pub const fn is_error(&self) -> bool { + matches!(self, Self::Failure(_)) + } +} + +/// A JSON-RPC 2.0 error object. +/// +/// This response indicates that the server received and handled the request, +/// but that there was an error in the processing of it. The error should be +/// included in the `message` field of the response payload. +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +pub struct ErrorPayload> { + /// The error code. + pub code: i64, + /// The error message (if any). + pub message: Cow<'static, str>, + /// The error data (if any). + pub data: Option, +} + +impl ErrorPayload { + /// Create a new error payload for a parse error. + pub const fn parse_error() -> Self { + Self { + code: -32700, + message: Cow::Borrowed("Parse error"), + data: None, + } + } + + /// Create a new error payload for an invalid request. + pub const fn invalid_request() -> Self { + Self { + code: -32600, + message: Cow::Borrowed("Invalid Request"), + data: None, + } + } + + /// Create a new error payload for a method not found error. + pub const fn method_not_found() -> Self { + Self { + code: -32601, + message: Cow::Borrowed("Method not found"), + data: None, + } + } + + /// Create a new error payload for an invalid params error. + pub const fn invalid_params() -> Self { + Self { + code: -32602, + message: Cow::Borrowed("Invalid params"), + data: None, + } + } + + /// Create a new error payload for an internal error. + pub const fn internal_error() -> Self { + Self { + code: -32603, + message: INTERNAL_ERROR, + data: None, + } + } + + /// Create a new error payload for an internal error with a custom message. + pub const fn internal_error_message(message: Cow<'static, str>) -> Self { + Self { + code: -32603, + message, + data: None, + } + } + + /// Create a new error payload for an internal error with a custom message + /// and additional data. + pub const fn internal_error_with_obj(data: E) -> Self + where + E: RpcSend, + { + Self { + code: -32603, + message: INTERNAL_ERROR, + data: Some(data), + } + } + + /// Create a new error payload for an internal error with a custom message + pub const fn internal_error_with_message_and_obj(message: Cow<'static, str>, data: E) -> Self + where + E: RpcSend, + { + Self { + code: -32603, + message, + data: Some(data), + } + } + + /// Analyzes the [ErrorPayload] and decides if the request should be + /// retried based on the error code or the message. + pub fn is_retry_err(&self) -> bool { + // alchemy throws it this way + if self.code == 429 { + return true; + } + + // This is an infura error code for `exceeded project rate limit` + if self.code == -32005 { + return true; + } + + // alternative alchemy error for specific IPs + if self.code == -32016 && self.message.contains("rate limit") { + return true; + } + + // quick node error `"credits limited to 6000/sec"` + // + if self.code == -32012 && self.message.contains("credits") { + return true; + } + + // quick node rate limit error: `100/second request limit reached - reduce calls per second + // or upgrade your account at quicknode.com` + if self.code == -32007 && self.message.contains("request limit reached") { + return true; + } + + match self.message.as_ref() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + msg => { + msg.contains("rate limit") + || msg.contains("rate exceeded") + || msg.contains("too many requests") + || msg.contains("credits limited") + || msg.contains("request limit") + } + } + } +} + +impl From for ErrorPayload +where + T: std::error::Error + RpcSend, +{ + fn from(value: T) -> Self { + Self { + code: -32603, + message: INTERNAL_ERROR, + data: Some(value), + } + } +} + +impl ErrorPayload +where + E: RpcSend, +{ + /// Serialize the inner data into a [`RawValue`]. + pub fn serialize_payload(&self) -> serde_json::Result { + Ok(ErrorPayload { + code: self.code, + message: self.message.clone(), + data: match self.data.as_ref() { + Some(data) => Some(to_raw_value(data)?), + None => None, + }, + }) + } +} + +impl fmt::Display for ErrorPayload { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "error code {}: {}{}", + self.code, + self.message, + self.data + .as_ref() + .map(|data| format!(", data: {}", data)) + .unwrap_or_default() + ) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::assert_rv_eq; + + #[test] + fn ser_failure() { + let id = RawValue::from_string("1".to_string()).unwrap(); + let res = Response::<(), ()>::serialization_failure(&id); + assert_rv_eq( + &res, + r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32700,"message":"response serialization error"}}"#, + ); + } +} + +// Some code is this file is reproduced under the terms of the MIT license. It +// originates from the `alloy` crate. The original source code can be found at +// the following URL, and the original license is included below. +// +// https://github.com/alloy-rs/alloy +// +// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE.