Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Use async/await in all endpoints #1862

Merged
merged 9 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 63 additions & 75 deletions relay-server/src/body/peek_line.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use actix_web::error::PayloadError;
use actix_web::HttpRequest;
use bytes::Bytes;
use futures01::{Async, Future, Poll, Stream};
use smallvec::SmallVec;

use crate::extractors::{Decoder, SharedPayload};

/// A request body adapter that peeks the first line of a multi-line body.
/// Peeks the first line of a multi-line body without consuming it.
///
/// `PeekLine` is a future created from a request's [`Payload`]. It is especially designed to be
/// used together with [`SharedPayload`](crate::extractors::SharedPayload), since it polls an
/// underlying payload and places the polled chunks back into the payload when completing.
/// This function returns a future to the request [`Payload`]. It is especially designed to be used
/// together with [`SharedPayload`], since it polls an underlying payload and places the polled
/// chunks back into the payload when completing.
///
/// If the payload does not contain a newline, the entire payload is returned by the future. To
/// contrain this, use `limit` to set a maximum size returned by the future.
Expand All @@ -18,110 +18,87 @@ use crate::extractors::{Decoder, SharedPayload};
/// smaller than the size limit. Otherwise, resolves to `None`. Any errors on the underlying stream
/// are returned without change.
///
/// # Cancel Safety
///
/// This function is _not_ cancellation safe. If canceled, partially read data may not be put back
/// into the request. Additionally, it is not safe to read the body after an error has been returned
/// since data may have been partially consumed.
///
/// [`Payload`]: actix_web::dev::Payload
pub struct PeekLine {
payload: SharedPayload,
decoder: Decoder,
chunks: SmallVec<[Bytes; 3]>,
}

impl PeekLine {
/// Creates a new peek line future from the given payload.
///
/// Note that the underlying stream may return more data than the configured limit. The future
/// will still never resolve more than the limit set.
pub fn new<S>(request: &HttpRequest<S>, limit: usize) -> Self {
Self {
payload: SharedPayload::get(request),
decoder: Decoder::new(request, limit),
chunks: SmallVec::new(),
}
/// [`SharedPayload`]: crate::extractors::SharedPayload
pub async fn peek_line<S>(
request: &HttpRequest<S>,
limit: usize,
) -> Result<Option<Bytes>, PayloadError> {
let mut payload = SharedPayload::get(request);
let mut decoder = Decoder::new(request, limit);
let mut chunks = SmallVec::<[_; 3]>::new();
let mut overflow = false;

while let (Some(chunk), false) = (payload.chunk().await?, overflow) {
overflow = decoder.decode(&chunk)?;
chunks.push(chunk);
}

fn revert_chunks(&mut self) {
// unread in reverse order
while let Some(chunk) = self.chunks.pop() {
self.payload.unread_data(chunk);
}
}

fn finish(&mut self, overflow: bool) -> std::io::Result<Option<Bytes>> {
let buffer = self.decoder.finish()?;
let buffer = decoder.finish()?;

let line = match buffer.iter().position(|b| *b == b'\n') {
Some(pos) => Some(buffer.slice_to(pos)),
None if !overflow => Some(buffer),
None => None,
};
let line = match buffer.iter().position(|b| *b == b'\n') {
Some(pos) => Some(buffer.slice_to(pos)),
None if !overflow => Some(buffer),
None => None,
};

self.revert_chunks();
Ok(line.filter(|line| !line.is_empty()))
// unread in reverse order
while let Some(chunk) = chunks.pop() {
payload.unread_data(chunk);
}
}

impl Future for PeekLine {
type Item = Option<Bytes>;
type Error = <SharedPayload as Stream>::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let chunk = match self.payload.poll()? {
Async::Ready(Some(chunk)) => chunk,
Async::Ready(None) => return Ok(Async::Ready(self.finish(false)?)),
Async::NotReady => return Ok(Async::NotReady),
};

self.chunks.push(chunk.clone());
if self.decoder.decode(chunk)? {
return Ok(Async::Ready(self.finish(true)?));
}
}
}
Ok(line.filter(|line| !line.is_empty()))
}

#[cfg(test)]
mod tests {
use super::*;
use relay_test::TestRequest;

#[test]
fn test_empty() {
#[tokio::test]
async fn test_empty() {
relay_test::setup();

let request = TestRequest::with_state(())
.set_payload("".to_string())
.finish();

let opt = relay_test::block_fn(move || PeekLine::new(&request, 10)).unwrap();
let opt = peek_line(&request, 10).await.unwrap();
assert_eq!(opt, None);
}

#[test]
fn test_one_line() {
#[tokio::test]
async fn test_one_line() {
relay_test::setup();

let request = TestRequest::with_state(())
.set_payload("test".to_string())
.finish();

let opt = relay_test::block_fn(move || PeekLine::new(&request, 10)).unwrap();
let opt = peek_line(&request, 10).await.unwrap();
assert_eq!(opt, Some("test".into()));
}

#[test]
fn test_linebreak() {
#[tokio::test]
async fn test_linebreak() {
relay_test::setup();

let request = TestRequest::with_state(())
.set_payload("test\ndone".to_string())
.finish();

let opt = relay_test::block_fn(move || PeekLine::new(&request, 10)).unwrap();
let opt = peek_line(&request, 10).await.unwrap();
assert_eq!(opt, Some("test".into()));
}

#[test]
fn test_limit_satisfied() {
#[tokio::test]
async fn test_limit_satisfied() {
relay_test::setup();

let payload = "test\ndone";
Expand All @@ -130,12 +107,12 @@ mod tests {
.finish();

// NOTE: Newline fits into the size limit.
let opt = relay_test::block_fn(move || PeekLine::new(&request, 5)).unwrap();
let opt = peek_line(&request, 5).await.unwrap();
assert_eq!(opt, Some("test".into()));
}

#[test]
fn test_limit_exceeded() {
#[tokio::test]
async fn test_limit_exceeded() {
relay_test::setup();

let payload = "test\ndone";
Expand All @@ -145,11 +122,22 @@ mod tests {

// NOTE: newline is not found within the size limit. even though the payload would fit,
// according to the doc comment we return `None`.
let opt = relay_test::block_fn(move || PeekLine::new(&request, 4)).unwrap();
let opt = peek_line(&request, 4).await.unwrap();
assert_eq!(opt, None);
}

// NB: Repeat polls cannot be tested unfortunately, since `Payload::set_read_buffer_capacity`
// does not take effect in test requests, and the sender returned by `Payload::new` does not
// have a public interface.
#[tokio::test]
async fn test_shared_payload() {
relay_test::setup();

let request = TestRequest::with_state(())
.set_payload("test\ndone".to_string())
.finish();

peek_line(&request, 10).await.unwrap();

let mut payload = SharedPayload::get(&request);
let chunk = payload.chunk().await.unwrap();
assert_eq!(chunk.as_deref(), Some(b"test\ndone".as_slice()));
}
}
61 changes: 14 additions & 47 deletions relay-server/src/body/request_body.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,27 @@
use actix_web::{error::PayloadError, HttpRequest};
use bytes::Bytes;
use futures01::prelude::*;

use crate::extractors::{Decoder, SharedPayload};
use crate::utils;

/// Future that resolves to a complete store endpoint body.
pub struct RequestBody {
err: Option<PayloadError>,
stream: Option<(SharedPayload, Decoder)>,
}

impl RequestBody {
/// Create `ForwardBody` for request.
pub fn new<S>(req: &HttpRequest<S>, limit: usize) -> Self {
if let Some(length) = utils::get_content_length(req) {
if length > limit {
return RequestBody {
stream: None,
err: Some(PayloadError::Overflow),
};
}
}

RequestBody {
stream: Some((SharedPayload::get(req), Decoder::new(req, limit))),
err: None,
/// Reads the body of a Relay endpoint request.
///
/// If the body exceeds the given `limit` during streaming or decompression, an error is returned.
pub async fn request_body<S>(req: &HttpRequest<S>, limit: usize) -> Result<Bytes, PayloadError> {
if let Some(length) = utils::get_content_length(req) {
if length > limit {
return Err(PayloadError::Overflow);
}
}
}

impl Future for RequestBody {
type Item = Bytes;
type Error = PayloadError;
let mut payload = SharedPayload::get(req);
let mut decoder = Decoder::new(req, limit);

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.err.take() {
return Err(err);
while let Some(encoded) = payload.chunk().await? {
if decoder.decode(&encoded)? {
return Err(PayloadError::Overflow);
}

if let Some((ref mut payload, ref mut decoder)) = self.stream {
loop {
return match payload.poll()? {
Async::Ready(Some(encoded)) => {
if decoder.decode(encoded)? {
Err(PayloadError::Overflow)
} else {
continue;
}
}
Async::Ready(None) => Ok(Async::Ready(decoder.finish()?)),
Async::NotReady => Ok(Async::NotReady),
};
}
}

panic!("cannot be used second time")
}

Ok(decoder.finish()?)
}
62 changes: 23 additions & 39 deletions relay-server/src/body/store_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,34 @@ use actix_web::{error::PayloadError, HttpRequest};
use bytes::Bytes;
use data_encoding::BASE64;
use flate2::read::ZlibDecoder;
use futures01::prelude::*;
use url::form_urlencoded;

use relay_statsd::metric;

use crate::body::RequestBody;
use crate::body;
use crate::statsd::RelayHistograms;

/// Future that resolves to a complete store endpoint body.
pub struct StoreBody {
inner: RequestBody,
result: Option<Result<Bytes, PayloadError>>,
}

impl StoreBody {
/// Create `StoreBody` for request.
pub fn new<S>(req: &HttpRequest<S>, limit: usize) -> Self {
Self {
inner: RequestBody::new(req, limit),
result: data_from_querystring(req).map(|body| decode_bytes(body.as_bytes())),
}
/// Reads the body of a store request.
///
/// In addition to [`request_body`](crate::body::request_body), this also supports two additional
/// modes of sending encoded payloads:
///
/// - In query parameters of the HTTP request.
/// - As base64-encoded zlib compression without additional HTTP headers.
///
/// If the body exceeds the given `limit` during streaming or decompression, an error is returned.
pub async fn store_body<S>(req: &HttpRequest<S>, limit: usize) -> Result<Bytes, PayloadError> {
if let Some(body) = data_from_querystring(req) {
return decode_bytes(body.as_bytes(), limit);
}
}

impl Future for StoreBody {
type Item = Bytes;
type Error = PayloadError;
let body = body::request_body(req, limit).await?;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(result) = self.result.take() {
return result.map(Async::Ready);
}
metric!(histogram(RelayHistograms::RequestSizeBytesRaw) = body.len() as u64);
let decoded = decode_bytes(body, limit)?;
metric!(histogram(RelayHistograms::RequestSizeBytesUncompressed) = decoded.len() as u64);

let poll = match self.inner.poll()? {
Async::Ready(body) => {
metric!(histogram(RelayHistograms::RequestSizeBytesRaw) = body.len() as u64);
let decoded = decode_bytes(body)?;
metric!(
histogram(RelayHistograms::RequestSizeBytesUncompressed) = decoded.len() as u64
);
Async::Ready(decoded)
}
Async::NotReady => Async::NotReady,
};

Ok(poll)
}
Ok(decoded)
}

fn data_from_querystring<S>(req: &HttpRequest<S>) -> Option<Cow<'_, str>> {
Expand All @@ -65,7 +46,10 @@ fn data_from_querystring<S>(req: &HttpRequest<S>) -> Option<Cow<'_, str>> {
Some(value)
}

fn decode_bytes<B: Into<Bytes> + AsRef<[u8]>>(body: B) -> Result<Bytes, PayloadError> {
fn decode_bytes<B>(body: B, limit: usize) -> Result<Bytes, PayloadError>
where
B: Into<Bytes> + AsRef<[u8]>,
{
if body.as_ref().starts_with(b"{") {
return Ok(body.into());
}
Expand All @@ -79,7 +63,7 @@ fn decode_bytes<B: Into<Bytes> + AsRef<[u8]>>(body: B) -> Result<Bytes, PayloadE
return Ok(binary_body.into());
}

let mut decode_stream = ZlibDecoder::new(binary_body.as_slice());
let mut decode_stream = ZlibDecoder::new(binary_body.as_slice()).take(limit as u64);
let mut bytes = vec![];
decode_stream.read_to_end(&mut bytes)?;

Expand Down
Loading