Skip to content

Commit

Permalink
refactor: Remove lifetime from Context, ResponseFuture, InterceptorFu…
Browse files Browse the repository at this point in the history
…ture (#405)

This PR will fix #403

The benchmark seems can't reflect the actual improvement. (I guess the test case(64K) is not suitable to measure the different between an `Arc::new`)

Before:

```shell
download 64K: isahc     time:   [201.80 ms 204.40 ms 207.00 ms]
```

After

```shell
download 64K: isahc     time:   [200.32 ms 203.57 ms 206.90 ms]
```

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Nov 7, 2022
1 parent ddda137 commit 7a56ada
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 206 deletions.
189 changes: 95 additions & 94 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::{Error, ErrorKind},
handler::{RequestHandler, ResponseBodyReader},
headers::HasHeaders,
interceptor::{self, Interceptor, InterceptorObj},
interceptor::{self, Interceptor, InterceptorFuture, InterceptorObj},
parsing::header_to_curl_string,
util::future::FutureExt,
};
Expand Down Expand Up @@ -650,6 +650,11 @@ impl HttpClient {
self.inner.cookie_jar.as_ref()
}

/// Get the configured interceptors for this HTTP client.
pub(crate) fn interceptors(&self) -> &[InterceptorObj] {
&self.inner.interceptors
}

/// Send a GET request to the given URI.
///
/// To customize the request further, see [`HttpClient::send`]. To execute
Expand Down Expand Up @@ -681,7 +686,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::get`].
pub fn get_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn get_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -723,7 +728,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::head`].
pub fn head_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn head_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -769,7 +774,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::post`].
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -817,7 +822,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::put`].
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -849,7 +854,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::delete`].
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -995,7 +1000,7 @@ impl HttpClient {
/// # Ok(()) }
/// ```
#[inline]
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture<'_>
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture
where
B: Into<AsyncBody>,
{
Expand All @@ -1012,10 +1017,7 @@ impl HttpClient {
}

/// Actually send the request. All the public methods go through here.
async fn send_async_inner(
&self,
mut request: Request<AsyncBody>,
) -> Result<Response<AsyncBody>, Error> {
fn send_async_inner(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
// Populate request config, creating if necessary.
if let Some(config) = request.extensions_mut().get_mut::<RequestConfig>() {
// Merge request configuration with defaults.
Expand All @@ -1027,11 +1029,85 @@ impl HttpClient {
}

let ctx = interceptor::Context {
invoker: Arc::new(self),
interceptors: &self.inner.interceptors,
client: self.clone(),
interceptor_offset: 0,
};

ctx.send(request).await
ctx.send(request)
}

pub(crate) fn invoke(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
let client = self.clone();

Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;

// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Check if automatic decompression is enabled; we'll need to know
// this later after the response is sent.
let is_automatic_decompression = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.automatic_decompression
.unwrap_or(false);

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = client
.create_easy_handle(request)
.map_err(Error::from_any)?;

// Send the request to the agent to be executed.
client.inner.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let body_len = response.content_length().filter(|_| {
// If automatic decompression is enabled, and will likely be
// selected, then the value of Content-Length does not indicate
// the uncompressed body length and merely the compressed data
// length. If it looks like we are in this scenario then we
// ignore the Content-Length, since it can only cause confusion
// when included with the body.
if is_automatic_decompression {
if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
if value != "identity" {
return false;
}
}
}

true
});

// Convert the reader into an opaque Body.
Ok(response.map(|reader| {
if is_head_request {
AsyncBody::empty()
} else {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_client: client,
};

if let Some(len) = body_len {
AsyncBody::from_reader_sized(body, len)
} else {
AsyncBody::from_reader(body)
}
}
}))
})
}

fn create_easy_handle(
Expand Down Expand Up @@ -1155,81 +1231,6 @@ impl HttpClient {
}
}

impl crate::interceptor::Invoke for &HttpClient {
fn invoke(
&self,
mut request: Request<AsyncBody>,
) -> crate::interceptor::InterceptorFuture<'_, Error> {
Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;

// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Check if automatic decompression is enabled; we'll need to know
// this later after the response is sent.
let is_automatic_decompression = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.automatic_decompression
.unwrap_or(false);

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = self.create_easy_handle(request).map_err(Error::from_any)?;

// Send the request to the agent to be executed.
self.inner.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let body_len = response.content_length().filter(|_| {
// If automatic decompression is enabled, and will likely be
// selected, then the value of Content-Length does not indicate
// the uncompressed body length and merely the compressed data
// length. If it looks like we are in this scenario then we
// ignore the Content-Length, since it can only cause confusion
// when included with the body.
if is_automatic_decompression {
if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
if value != "identity" {
return false;
}
}
}

true
});

// Convert the reader into an opaque Body.
Ok(response.map(|reader| {
if is_head_request {
AsyncBody::empty()
} else {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_client: (*self).clone(),
};

if let Some(len) = body_len {
AsyncBody::from_reader_sized(body, len)
} else {
AsyncBody::from_reader(body)
}
}
}))
})
}
}

impl fmt::Debug for HttpClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpClient").finish()
Expand All @@ -1238,12 +1239,12 @@ impl fmt::Debug for HttpClient {

/// A future for a request being executed.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'c + Send>>);
pub struct ResponseFuture(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'static + Send>>);

impl<'c> ResponseFuture<'c> {
impl ResponseFuture {
fn new<F>(future: F) -> Self
where
F: Future<Output = <Self as Future>::Output> + Send + 'c,
F: Future<Output = <Self as Future>::Output> + Send + 'static,
{
ResponseFuture(Box::pin(future))
}
Expand All @@ -1253,15 +1254,15 @@ impl<'c> ResponseFuture<'c> {
}
}

impl Future for ResponseFuture<'_> {
impl Future for ResponseFuture {
type Output = Result<Response<AsyncBody>, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}

impl<'c> fmt::Debug for ResponseFuture<'c> {
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResponseFuture").finish()
}
Expand Down
70 changes: 35 additions & 35 deletions src/cookies/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,49 @@ impl CookieInterceptor {
impl Interceptor for CookieInterceptor {
type Err = Error;

fn intercept<'a>(
&'a self,
fn intercept(
&self,
mut request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err> {
Box::pin(async move {
// Determine the cookie jar to use for this request. If one is
// attached to this specific request, use it, otherwise use the
// default one.
let jar = request
.extensions()
.get::<CookieJar>()
.cloned()
.or_else(|| self.cookie_jar.clone());

if let Some(jar) = jar.as_ref() {
// Get the outgoing cookie header.
let mut cookie_string = request
.headers_mut()
.remove(http::header::COOKIE)
.map(|value| value.as_bytes().to_vec())
.unwrap_or_default();
ctx: Context,
) -> InterceptorFuture<Self::Err> {
// Determine the cookie jar to use for this request. If one is
// attached to this specific request, use it, otherwise use the
// default one.
let jar = request
.extensions()
.get::<CookieJar>()
.cloned()
.or_else(|| self.cookie_jar.clone());

// Append cookies in the jar to the cookie header value.
for cookie in jar.get_for_uri(request.uri()) {
if !cookie_string.is_empty() {
cookie_string.extend_from_slice(b"; ");
}
if let Some(jar) = jar.as_ref() {
// Get the outgoing cookie header.
let mut cookie_string = request
.headers_mut()
.remove(http::header::COOKIE)
.map(|value| value.as_bytes().to_vec())
.unwrap_or_default();

cookie_string.extend_from_slice(cookie.name().as_bytes());
cookie_string.push(b'=');
cookie_string.extend_from_slice(cookie.value().as_bytes());
// Append cookies in the jar to the cookie header value.
for cookie in jar.get_for_uri(request.uri()) {
if !cookie_string.is_empty() {
cookie_string.extend_from_slice(b"; ");
}

if !cookie_string.is_empty() {
if let Ok(header_value) = cookie_string.try_into() {
request
.headers_mut()
.insert(http::header::COOKIE, header_value);
}
cookie_string.extend_from_slice(cookie.name().as_bytes());
cookie_string.push(b'=');
cookie_string.extend_from_slice(cookie.value().as_bytes());
}

if !cookie_string.is_empty() {
if let Ok(header_value) = cookie_string.try_into() {
request
.headers_mut()
.insert(http::header::COOKIE, header_value);
}
}
}

Box::pin(async move {
let request_uri = request.uri().clone();
let mut response = ctx.send(request).await?;

Expand Down
Loading

0 comments on commit 7a56ada

Please sign in to comment.