Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reqwest example #222

Closed
Threated opened this issue Feb 12, 2025 · 2 comments
Closed

reqwest example #222

Threated opened this issue Feb 12, 2025 · 2 comments

Comments

@Threated
Copy link
Contributor

I feel like its really common that one host service uses reqwest to communicate with another as its the most popular http client. It prohibits a lot of the use cases of using turmoil for simulating service to service communication as its annoying to replace it with a plain http connector like in the axum example.

But looking at seanmonstar/reqwest#2496 inspired me to look into making it happen with a regular looking reqwest::Client.
And it works! It is unsound as it relies on the vtable layout of a sealed trait being the same as my user defined trait but as long as the internal trait stays the same it should work.

The code is similar to the fake client from the axum example with the wrapper io type:

mod connector {
    use hyper::rt::{Read, Write};
    use hyper_util::client::legacy::connect::Connection;
    use pin_project_lite::pin_project;
    use reqwest::Client;
    use std::{io::Error, pin::Pin};
    use tokio::io::AsyncWrite;
    use tower::util::BoxCloneSyncService;
    use turmoil::ToSocketAddrs;
    use turmoil::net::TcpStream;

    impl TurmoilConnection {
        pub async fn new(addr: impl ToSocketAddrs) -> std::io::Result<Self> {
            TcpStream::connect(addr).await.map(|s| Self { fut: s })
        }
    }

    pin_project! {
        struct TurmoilConnection {
            #[pin]
            fut: turmoil::net::TcpStream
        }
    }

    impl hyper::rt::Read for TurmoilConnection {
        fn poll_read(
            self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
            mut buf: hyper::rt::ReadBufCursor<'_>,
        ) -> std::task::Poll<Result<(), Error>> {
            let n = unsafe {
                let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
                let result = tokio::io::AsyncRead::poll_read(self.project().fut, cx, &mut tbuf);
                match result {
                    std::task::Poll::Ready(Ok(())) => tbuf.filled().len(),
                    other => return other,
                }
            };

            unsafe {
                buf.advance(n);
            }
            std::task::Poll::Ready(Ok(()))
        }
    }

    impl hyper::rt::Write for TurmoilConnection {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
            buf: &[u8],
        ) -> std::task::Poll<Result<usize, Error>> {
            Pin::new(&mut self.fut).poll_write(cx, buf)
        }

        fn poll_flush(
            mut self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
        ) -> std::task::Poll<Result<(), Error>> {
            Pin::new(&mut self.fut).poll_flush(cx)
        }

        fn poll_shutdown(
            mut self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
        ) -> std::task::Poll<Result<(), Error>> {
            Pin::new(&mut self.fut).poll_shutdown(cx)
        }
    }

    impl hyper_util::client::legacy::connect::Connection for TurmoilConnection {
        fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
            hyper_util::client::legacy::connect::Connected::new()
        }
    }

    /// Copy of https://github.com/seanmonstar/reqwest/blob/37074368012ce42e61e5649c2fffcf8c8a979e1e/src/connect.rs#L737
    trait AsyncConn: Read + Write + Connection + Send + Sync + Unpin + 'static {}

    impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}

    /// Copy of https://github.com/seanmonstar/reqwest/blob/37074368012ce42e61e5649c2fffcf8c8a979e1e/src/connect.rs#L767
    #[allow(dead_code)]
    struct FakeConn {
        conn: Box<dyn AsyncConn>,
        is_proxy: bool,
        tls_info: bool,
    }

    pub fn fake_client() -> Client {
        Client::builder()
            .connector_layer(tower::layer::layer_fn(
                |_inner: BoxCloneSyncService<_, _, _>| {
                    tower::service_fn(move |req| async {
                        let uri: hyper::Uri = unsafe { std::mem::transmute(req) };
                        let conn = Box::new(
                            TurmoilConnection::new(uri.authority().unwrap().as_str()).await?,
                        );
                        Ok(unsafe {
                            std::mem::transmute(FakeConn {
                                conn,
                                is_proxy: false,
                                tls_info: false,
                            })
                        })
                    })
                },
            ))
            .build()
            .unwrap()
    }
}

Should I create an example for this? Or is this too hacky?

@mcches
Copy link
Contributor

mcches commented Feb 14, 2025

This seems pretty hacky. It's probably worth considering exposing a connector method in reqwest, similar to what tonic does: https://docs.rs/tonic/latest/tonic/transport/struct.Endpoint.html#method.connect_with_connector

@Threated
Copy link
Contributor Author

Yeah I think that's what they are going for with the connector_layer function they are just unsure about what they want to expose to the api. There was some discussion about it in PR I linked that added it. Maybe we will get a nice API for it in the future

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants