From 926ad7bb2a3dd908321ad40da97331cb45363c89 Mon Sep 17 00:00:00 2001 From: Ivan Ukhov Date: Wed, 2 Oct 2024 15:18:58 +0200 Subject: [PATCH] Update google-apis-common --- google-apis-common/Cargo.toml | 30 ++++---- google-apis-common/src/auth.rs | 32 +++----- google-apis-common/src/lib.rs | 125 ++++++++++++++++---------------- google-apis-common/src/serde.rs | 9 +-- google-apis-common/src/url.rs | 11 ++- 5 files changed, 100 insertions(+), 107 deletions(-) diff --git a/google-apis-common/Cargo.toml b/google-apis-common/Cargo.toml index f136a6f4f15..e653220a03b 100644 --- a/google-apis-common/Cargo.toml +++ b/google-apis-common/Cargo.toml @@ -13,22 +13,20 @@ edition = "2021" [lib] doctest = false -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -mime = "^ 0.3" -serde = { version = "^ 1.0", features = ["derive"] } -serde_with = "3.0" -serde_json = "^ 1.0" - base64 = "0.22" -chrono = { version = "0.4.35", default-features = false, features = ["clock", "serde"] } -url = "2.0" -percent-encoding = "2.0" - -yup-oauth2 = { version = "9", optional = true } +chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } +http = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["client", "http2"] } +hyper-util = "0.1" itertools = "0.13" -hyper = { version = "^ 0.14", features = ["client", "http2"] } -http = "^0.2" -tokio = { version = "^1.0", features = ["time"] } -tower-service = "^0.3.1" +mime = "0.3" +percent-encoding = "2" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +serde_with = "3" +tokio = { version = "1", features = ["time"] } +tower-service = "0.3" +url = "2" +yup-oauth2 = { version = "11", optional = true } diff --git a/google-apis-common/src/auth.rs b/google-apis-common/src/auth.rs index 942b839fdb8..6ab883550e7 100644 --- a/google-apis-common/src/auth.rs +++ b/google-apis-common/src/auth.rs @@ -24,15 +24,10 @@ //! //! # Example //! ```rust -//! use core::future::Future; -//! use core::pin::Pin; +//! use std::future::Future; +//! use std::pin::Pin; //! //! use google_apis_common::{GetToken, oauth2}; -//! -//! use http::Uri; -//! use hyper::client::connect::Connection; -//! use tokio::io::{AsyncRead, AsyncWrite}; -//! use tower_service::Service; //! use oauth2::authenticator::Authenticator; //! //! #[derive(Clone)] @@ -41,12 +36,9 @@ //! retries: usize, //! } //! -//! impl GetToken for AuthenticatorWithRetry +//! impl GetToken for AuthenticatorWithRetry //! where -//! S: Service + Clone + Send + Sync + 'static, -//! S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, -//! S::Future: Send + Unpin + 'static, -//! S::Error: Into>, +//! C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, //! { //! fn get_token<'a>( //! &'a self, @@ -71,6 +63,7 @@ //! [`oauth2`]: https://docs.rs/oauth2/latest/oauth2/ //! [`AccessToken`]: https://docs.rs/oauth2/latest/oauth2/struct.AccessToken.html //! [`Authenticator`]: yup_oauth2::authenticator::Authenticator + use std::future::Future; use std::pin::Pin; @@ -127,20 +120,13 @@ impl GetToken for NoToken { #[cfg(feature = "yup-oauth2")] mod yup_oauth2_impl { - use super::{GetToken, GetTokenOutput}; - - use http::Uri; - use hyper::client::connect::Connection; - use tokio::io::{AsyncRead, AsyncWrite}; - use tower_service::Service; use yup_oauth2::authenticator::Authenticator; - impl GetToken for Authenticator + use super::{GetToken, GetTokenOutput}; + + impl GetToken for Authenticator where - S: Service + Clone + Send + Sync + 'static, - S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into>, + C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, { fn get_token<'a>(&'a self, scopes: &'a [&str]) -> GetTokenOutput<'a> { Box::pin(async move { diff --git a/google-apis-common/src/lib.rs b/google-apis-common/src/lib.rs index a867ad77d97..94d2624bce3 100644 --- a/google-apis-common/src/lib.rs +++ b/google-apis-common/src/lib.rs @@ -4,25 +4,17 @@ pub mod serde; pub mod url; use std::error; -use std::error::Error as StdError; use std::fmt::{self, Display}; use std::io::{self, Cursor, Read, Seek, SeekFrom, Write}; use std::str::FromStr; use std::time::Duration; -use itertools::Itertools; - -use hyper::http::Uri; - use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}; use hyper::Method; use hyper::StatusCode; - +use itertools::Itertools; use mime::Mime; - use serde_json as json; - -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::sleep; pub use auth::{GetToken, NoToken}; @@ -34,6 +26,9 @@ pub use yup_oauth2 as oauth2; const LINE_ENDING: &str = "\r\n"; +type Body = http_body_util::Full; +type Response = hyper::Response; + pub enum Retry { /// Signal you don't want to retry Abort, @@ -103,13 +98,14 @@ pub trait Delegate: Send { /// between various API calls. fn begin(&mut self, _info: MethodInfo) {} - /// Called whenever there is an [HttpError](hyper::Error), usually if there are network problems. + /// Called whenever there is an [HttpError](hyper_util::client::legacy::Error), usually if + /// there are network problems. /// /// If you choose to retry after a duration, the duration should be chosen using the /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). /// /// Return retry information. - fn http_error(&mut self, _err: &hyper::Error) -> Retry { + fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry { Retry::Abort } @@ -127,8 +123,8 @@ pub trait Delegate: Send { /// first place fn token( &mut self, - e: Box, - ) -> std::result::Result, Box> { + e: Box, + ) -> std::result::Result, Box> { Err(e) } @@ -178,11 +174,7 @@ pub trait Delegate: Send { /// /// If you choose to retry after a duration, the duration should be chosen using the /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). - fn http_failure( - &mut self, - _: &hyper::Response, - _err: Option, - ) -> Retry { + fn http_failure(&mut self, _: &Response, _err: Option) -> Retry { Retry::Abort } @@ -230,7 +222,7 @@ impl Delegate for DefaultDelegate {} #[derive(Debug)] pub enum Error { /// The http connection failed - HttpError(hyper::Error), + HttpError(hyper_util::client::legacy::Error), /// An attempt was made to upload a resource with size stored in field `.0` /// even though the maximum upload size is what is stored in field `.1`. @@ -245,7 +237,7 @@ pub enum Error { MissingAPIKey, /// We required a Token, but didn't get one from the Authenticator - MissingToken(Box), + MissingToken(Box), /// The delgate instructed to cancel the operation Cancelled, @@ -258,7 +250,7 @@ pub enum Error { JsonDecodeError(String, json::Error), /// Indicates an HTTP repsonse with a non-success status code - Failure(hyper::Response), + Failure(Response), /// An IO error occurred while reading a stream into memory Io(std::io::Error), @@ -575,15 +567,11 @@ impl RangeResponseHeader { } /// A utility type to perform a resumable upload from start to end. -pub struct ResumableUploadHelper<'a, A: 'a, S> +pub struct ResumableUploadHelper<'a, A: 'a, C> where - S: tower_service::Service + Clone + Send + Sync + 'static, - S::Response: - hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into>, + C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, { - pub client: &'a hyper::client::Client, + pub client: &'a hyper_util::client::legacy::Client, pub delegate: &'a mut dyn Delegate, pub start_at: Option, pub auth: &'a A, @@ -594,17 +582,15 @@ where pub media_type: Mime, pub content_length: u64, } -impl<'a, A, S> ResumableUploadHelper<'a, A, S> + +impl<'a, A, C> ResumableUploadHelper<'a, A, C> where - S: tower_service::Service + Clone + Send + Sync + 'static, - S::Response: - hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into>, + C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, { async fn query_transfer_status( &mut self, - ) -> std::result::Result>> { + ) -> std::result::Result> + { loop { match self .client @@ -622,7 +608,7 @@ where .header_value(), ) .header(AUTHORIZATION, self.auth_header.clone()) - .body(hyper::body::Body::empty()) + .body(Default::default()) .unwrap(), ) .await @@ -635,11 +621,14 @@ where RangeResponseHeader::from_bytes(hh.as_bytes()) } None | Some(_) => { - if let Retry::After(d) = self.delegate.http_failure(&r, None) { + let (parts, body) = r.into_parts(); + let body = Body::new(to_bytes(body).await.unwrap_or_default()); + let response = Response::from_parts(parts, body); + if let Retry::After(d) = self.delegate.http_failure(&response, None) { sleep(d).await; continue; } - return Err(Ok(r)); + return Err(Ok(response)); } }; return Ok(h.0.last); @@ -658,7 +647,9 @@ where /// returns None if operation was cancelled by delegate, or the HttpResult. /// It can be that we return the result just because we didn't understand the status code - /// caller should check for status himself before assuming it's OK to use - pub async fn upload(&mut self) -> Option>> { + pub async fn upload( + &mut self, + ) -> Option> { let mut start = match self.start_at { Some(s) => s, None => match self.query_transfer_status().await { @@ -694,7 +685,7 @@ where if self.delegate.cancel_chunk_upload(&range_header) { return None; } - let res = self + match self .client .request( hyper::Request::builder() @@ -703,37 +694,38 @@ where .header("Content-Range", range_header.header_value()) .header(CONTENT_TYPE, format!("{}", self.media_type)) .header(USER_AGENT, self.user_agent.to_string()) - .body(hyper::body::Body::from(req_bytes)) + .body(Body::new(hyper::body::Bytes::from(req_bytes))) .unwrap(), ) - .await; - match res { - Ok(res) => { + .await + { + Ok(response) => { start += request_size; - if res.status() == StatusCode::PERMANENT_REDIRECT { + if response.status() == StatusCode::PERMANENT_REDIRECT { continue; } - let (res_parts, res_body) = res.into_parts(); - let res_body = match hyper::body::to_bytes(res_body).await { - Ok(res_body) => res_body.into_iter().collect(), - Err(err) => return Some(Err(err)), - }; - let res_body_string: String = String::from_utf8(res_body).unwrap(); - let reconstructed_result = - hyper::Response::from_parts(res_parts, res_body_string.clone().into()); - - if !reconstructed_result.status().is_success() { + let (parts, body) = response.into_parts(); + let body = to_string(body).await.ok(); + let response = Response::from_parts( + parts, + body.clone() + .map(hyper::body::Bytes::from) + .map(Body::new) + .unwrap_or_default(), + ); + + if !response.status().is_success() { if let Retry::After(d) = self.delegate.http_failure( - &reconstructed_result, - json::from_str(&res_body_string).ok(), + &response, + body.as_ref().and_then(|value| json::from_str(value).ok()), ) { sleep(d).await; continue; } } - return Some(Ok(reconstructed_result)); + return Some(Ok(response)); } Err(err) => { if let Retry::After(d) = self.delegate.http_error(&err) { @@ -763,10 +755,19 @@ pub fn remove_json_null_values(value: &mut json::value::Value) { } // Borrowing the body object as mutable and converts it to a string -pub async fn get_body_as_string(res_body: &mut hyper::Body) -> String { - let res_body_buf = hyper::body::to_bytes(res_body).await.unwrap(); - let res_body_string = String::from_utf8_lossy(&res_body_buf); - res_body_string.to_string() +pub async fn get_body_as_string(body: T) -> String { + to_string(body).await.unwrap_or_default() +} + +async fn to_bytes( + body: T, +) -> std::result::Result { + use http_body_util::BodyExt; + Ok(body.collect().await?.to_bytes()) +} + +async fn to_string(body: T) -> std::result::Result { + Ok(String::from_utf8_lossy(&to_bytes(body).await?).to_string()) } #[cfg(test)] diff --git a/google-apis-common/src/serde.rs b/google-apis-common/src/serde.rs index 8d95afe87c4..8fd19a512ff 100644 --- a/google-apis-common/src/serde.rs +++ b/google-apis-common/src/serde.rs @@ -174,8 +174,8 @@ pub mod standard_base64 { Ok(decoded) => Ok(decoded), Err(first_err) => match base64::prelude::BASE64_URL_SAFE.decode(s.as_ref()) { Ok(decoded) => Ok(decoded), - Err(_) => Err(serde::de::Error::custom(first_err)) - } + Err(_) => Err(serde::de::Error::custom(first_err)), + }, } } } @@ -212,8 +212,8 @@ pub mod urlsafe_base64 { Ok(decoded) => Ok(decoded), Err(first_err) => match base64::prelude::BASE64_STANDARD.decode(s.as_ref()) { Ok(decoded) => Ok(decoded), - Err(_) => Err(serde::de::Error::custom(first_err)) - } + Err(_) => Err(serde::de::Error::custom(first_err)), + }, } } } @@ -226,7 +226,6 @@ pub fn datetime_to_string(datetime: &chrono::DateTime) -> S #[cfg(test)] mod test { use super::{duration, standard_base64, urlsafe_base64}; - use base64::Engine as _; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; diff --git a/google-apis-common/src/url.rs b/google-apis-common/src/url.rs index 3da0b3c7bcf..d27acdd0cbf 100644 --- a/google-apis-common/src/url.rs +++ b/google-apis-common/src/url.rs @@ -40,7 +40,16 @@ impl<'a> Params<'a> { from: &str, url_encode: bool, ) -> String { - const DEFAULT_ENCODE_SET: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'#').add(b'<').add(b'>').add(b'`').add(b'?').add(b'{').add(b'}'); + const DEFAULT_ENCODE_SET: &AsciiSet = &CONTROLS + .add(b' ') + .add(b'"') + .add(b'#') + .add(b'<') + .add(b'>') + .add(b'`') + .add(b'?') + .add(b'{') + .add(b'}'); if url_encode { let mut replace_with: Cow = self.get(param).unwrap_or_default().into(); if from.as_bytes()[1] == b'+' {