Skip to content

Commit

Permalink
Update google-apis-common
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanUkhov committed Oct 2, 2024
1 parent 6993a8f commit 926ad7b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 107 deletions.
30 changes: 14 additions & 16 deletions google-apis-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@ edition = "2021"
[lib]
doctest = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mime = "^ 0.3"
serde = { version = "^ 1.0", features = ["derive"] }
serde_with = "3.0"
serde_json = "^ 1.0"

base64 = "0.22"
chrono = { version = "0.4.35", default-features = false, features = ["clock", "serde"] }
url = "2.0"
percent-encoding = "2.0"

yup-oauth2 = { version = "9", optional = true }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["client", "http2"] }
hyper-util = "0.1"
itertools = "0.13"
hyper = { version = "^ 0.14", features = ["client", "http2"] }
http = "^0.2"
tokio = { version = "^1.0", features = ["time"] }
tower-service = "^0.3.1"
mime = "0.3"
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "3"
tokio = { version = "1", features = ["time"] }
tower-service = "0.3"
url = "2"
yup-oauth2 = { version = "11", optional = true }
32 changes: 9 additions & 23 deletions google-apis-common/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,10 @@
//!
//! # Example
//! ```rust
//! use core::future::Future;
//! use core::pin::Pin;
//! use std::future::Future;
//! use std::pin::Pin;
//!
//! use google_apis_common::{GetToken, oauth2};
//!
//! use http::Uri;
//! use hyper::client::connect::Connection;
//! use tokio::io::{AsyncRead, AsyncWrite};
//! use tower_service::Service;
//! use oauth2::authenticator::Authenticator;
//!
//! #[derive(Clone)]
Expand All @@ -41,12 +36,9 @@
//! retries: usize,
//! }
//!
//! impl<S> GetToken for AuthenticatorWithRetry<S>
//! impl<C> GetToken for AuthenticatorWithRetry<C>
//! where
//! S: Service<Uri> + Clone + Send + Sync + 'static,
//! S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
//! S::Future: Send + Unpin + 'static,
//! S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
//! C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
//! {
//! fn get_token<'a>(
//! &'a self,
Expand All @@ -71,6 +63,7 @@
//! [`oauth2`]: https://docs.rs/oauth2/latest/oauth2/
//! [`AccessToken`]: https://docs.rs/oauth2/latest/oauth2/struct.AccessToken.html
//! [`Authenticator`]: yup_oauth2::authenticator::Authenticator
use std::future::Future;
use std::pin::Pin;

Expand Down Expand Up @@ -127,20 +120,13 @@ impl GetToken for NoToken {

#[cfg(feature = "yup-oauth2")]
mod yup_oauth2_impl {
use super::{GetToken, GetTokenOutput};

use http::Uri;
use hyper::client::connect::Connection;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use yup_oauth2::authenticator::Authenticator;

impl<S> GetToken for Authenticator<S>
use super::{GetToken, GetTokenOutput};

impl<C> GetToken for Authenticator<C>
where
S: Service<Uri> + Clone + Send + Sync + 'static,
S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
S::Future: Send + Unpin + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
{
fn get_token<'a>(&'a self, scopes: &'a [&str]) -> GetTokenOutput<'a> {
Box::pin(async move {
Expand Down
125 changes: 63 additions & 62 deletions google-apis-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,17 @@ pub mod serde;
pub mod url;

use std::error;
use std::error::Error as StdError;
use std::fmt::{self, Display};
use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
use std::str::FromStr;
use std::time::Duration;

use itertools::Itertools;

use hyper::http::Uri;

use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
use hyper::Method;
use hyper::StatusCode;

use itertools::Itertools;
use mime::Mime;

use serde_json as json;

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::sleep;

pub use auth::{GetToken, NoToken};
Expand All @@ -34,6 +26,9 @@ pub use yup_oauth2 as oauth2;

const LINE_ENDING: &str = "\r\n";

type Body = http_body_util::Full<hyper::body::Bytes>;
type Response = hyper::Response<Body>;

pub enum Retry {
/// Signal you don't want to retry
Abort,
Expand Down Expand Up @@ -103,13 +98,14 @@ pub trait Delegate: Send {
/// between various API calls.
fn begin(&mut self, _info: MethodInfo) {}

/// Called whenever there is an [HttpError](hyper::Error), usually if there are network problems.
/// Called whenever there is an [HttpError](hyper_util::client::legacy::Error), usually if
/// there are network problems.
///
/// If you choose to retry after a duration, the duration should be chosen using the
/// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff).
///
/// Return retry information.
fn http_error(&mut self, _err: &hyper::Error) -> Retry {
fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry {
Retry::Abort
}

Expand All @@ -127,8 +123,8 @@ pub trait Delegate: Send {
/// first place
fn token(
&mut self,
e: Box<dyn StdError + Send + Sync>,
) -> std::result::Result<Option<String>, Box<dyn StdError + Send + Sync>> {
e: Box<dyn std::error::Error + Send + Sync>,
) -> std::result::Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
Err(e)
}

Expand Down Expand Up @@ -178,11 +174,7 @@ pub trait Delegate: Send {
///
/// If you choose to retry after a duration, the duration should be chosen using the
/// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff).
fn http_failure(
&mut self,
_: &hyper::Response<hyper::body::Body>,
_err: Option<serde_json::Value>,
) -> Retry {
fn http_failure(&mut self, _: &Response, _err: Option<serde_json::Value>) -> Retry {
Retry::Abort
}

Expand Down Expand Up @@ -230,7 +222,7 @@ impl Delegate for DefaultDelegate {}
#[derive(Debug)]
pub enum Error {
/// The http connection failed
HttpError(hyper::Error),
HttpError(hyper_util::client::legacy::Error),

/// An attempt was made to upload a resource with size stored in field `.0`
/// even though the maximum upload size is what is stored in field `.1`.
Expand All @@ -245,7 +237,7 @@ pub enum Error {
MissingAPIKey,

/// We required a Token, but didn't get one from the Authenticator
MissingToken(Box<dyn StdError + Send + Sync>),
MissingToken(Box<dyn std::error::Error + Send + Sync>),

/// The delgate instructed to cancel the operation
Cancelled,
Expand All @@ -258,7 +250,7 @@ pub enum Error {
JsonDecodeError(String, json::Error),

/// Indicates an HTTP repsonse with a non-success status code
Failure(hyper::Response<hyper::body::Body>),
Failure(Response),

/// An IO error occurred while reading a stream into memory
Io(std::io::Error),
Expand Down Expand Up @@ -575,15 +567,11 @@ impl RangeResponseHeader {
}

/// A utility type to perform a resumable upload from start to end.
pub struct ResumableUploadHelper<'a, A: 'a, S>
pub struct ResumableUploadHelper<'a, A: 'a, C>
where
S: tower_service::Service<Uri> + Clone + Send + Sync + 'static,
S::Response:
hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
S::Future: Send + Unpin + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
{
pub client: &'a hyper::client::Client<S, hyper::body::Body>,
pub client: &'a hyper_util::client::legacy::Client<C, Body>,
pub delegate: &'a mut dyn Delegate,
pub start_at: Option<u64>,
pub auth: &'a A,
Expand All @@ -594,17 +582,15 @@ where
pub media_type: Mime,
pub content_length: u64,
}
impl<'a, A, S> ResumableUploadHelper<'a, A, S>

impl<'a, A, C> ResumableUploadHelper<'a, A, C>
where
S: tower_service::Service<Uri> + Clone + Send + Sync + 'static,
S::Response:
hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
S::Future: Send + Unpin + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
{
async fn query_transfer_status(
&mut self,
) -> std::result::Result<u64, hyper::Result<hyper::Response<hyper::body::Body>>> {
) -> std::result::Result<u64, std::result::Result<Response, hyper_util::client::legacy::Error>>
{
loop {
match self
.client
Expand All @@ -622,7 +608,7 @@ where
.header_value(),
)
.header(AUTHORIZATION, self.auth_header.clone())
.body(hyper::body::Body::empty())
.body(Default::default())
.unwrap(),
)
.await
Expand All @@ -635,11 +621,14 @@ where
RangeResponseHeader::from_bytes(hh.as_bytes())
}
None | Some(_) => {
if let Retry::After(d) = self.delegate.http_failure(&r, None) {
let (parts, body) = r.into_parts();
let body = Body::new(to_bytes(body).await.unwrap_or_default());
let response = Response::from_parts(parts, body);
if let Retry::After(d) = self.delegate.http_failure(&response, None) {
sleep(d).await;
continue;
}
return Err(Ok(r));
return Err(Ok(response));
}
};
return Ok(h.0.last);
Expand All @@ -658,7 +647,9 @@ where
/// returns None if operation was cancelled by delegate, or the HttpResult.
/// It can be that we return the result just because we didn't understand the status code -
/// caller should check for status himself before assuming it's OK to use
pub async fn upload(&mut self) -> Option<hyper::Result<hyper::Response<hyper::body::Body>>> {
pub async fn upload(
&mut self,
) -> Option<std::result::Result<Response, hyper_util::client::legacy::Error>> {
let mut start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status().await {
Expand Down Expand Up @@ -694,7 +685,7 @@ where
if self.delegate.cancel_chunk_upload(&range_header) {
return None;
}
let res = self
match self
.client
.request(
hyper::Request::builder()
Expand All @@ -703,37 +694,38 @@ where
.header("Content-Range", range_header.header_value())
.header(CONTENT_TYPE, format!("{}", self.media_type))
.header(USER_AGENT, self.user_agent.to_string())
.body(hyper::body::Body::from(req_bytes))
.body(Body::new(hyper::body::Bytes::from(req_bytes)))
.unwrap(),
)
.await;
match res {
Ok(res) => {
.await
{
Ok(response) => {
start += request_size;

if res.status() == StatusCode::PERMANENT_REDIRECT {
if response.status() == StatusCode::PERMANENT_REDIRECT {
continue;
}

let (res_parts, res_body) = res.into_parts();
let res_body = match hyper::body::to_bytes(res_body).await {
Ok(res_body) => res_body.into_iter().collect(),
Err(err) => return Some(Err(err)),
};
let res_body_string: String = String::from_utf8(res_body).unwrap();
let reconstructed_result =
hyper::Response::from_parts(res_parts, res_body_string.clone().into());

if !reconstructed_result.status().is_success() {
let (parts, body) = response.into_parts();
let body = to_string(body).await.ok();
let response = Response::from_parts(
parts,
body.clone()
.map(hyper::body::Bytes::from)
.map(Body::new)
.unwrap_or_default(),
);

if !response.status().is_success() {
if let Retry::After(d) = self.delegate.http_failure(
&reconstructed_result,
json::from_str(&res_body_string).ok(),
&response,
body.as_ref().and_then(|value| json::from_str(value).ok()),
) {
sleep(d).await;
continue;
}
}
return Some(Ok(reconstructed_result));
return Some(Ok(response));
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
Expand Down Expand Up @@ -763,10 +755,19 @@ pub fn remove_json_null_values(value: &mut json::value::Value) {
}

// Borrowing the body object as mutable and converts it to a string
pub async fn get_body_as_string(res_body: &mut hyper::Body) -> String {
let res_body_buf = hyper::body::to_bytes(res_body).await.unwrap();
let res_body_string = String::from_utf8_lossy(&res_body_buf);
res_body_string.to_string()
pub async fn get_body_as_string<T: hyper::body::Body>(body: T) -> String {
to_string(body).await.unwrap_or_default()
}

async fn to_bytes<T: hyper::body::Body>(
body: T,
) -> std::result::Result<hyper::body::Bytes, T::Error> {
use http_body_util::BodyExt;
Ok(body.collect().await?.to_bytes())
}

async fn to_string<T: hyper::body::Body>(body: T) -> std::result::Result<String, T::Error> {
Ok(String::from_utf8_lossy(&to_bytes(body).await?).to_string())
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 926ad7b

Please sign in to comment.