Skip to content

Commit

Permalink
chore: Add decode benchmarks (#207)
Browse files Browse the repository at this point in the history
* disable current benchmarks

* temporarily expose Streaming constructor

* hide Streaming constructor from docs

* remove crossbeam-queue from deny skip list

* add decode benches

* fmt

* use Bytes as MockDecoder Item
  • Loading branch information
alce authored and LucioFranco committed Dec 22, 2019
1 parent 59d008d commit 49ce265
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 3 deletions.
1 change: 0 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ deny = [
{ name = "term" },
]
skip = [
{ name = "crossbeam-queue", version = "=0.2.0" },
{ name = "bytes", version = "=0.4.12" },
]
skip-tree = [
Expand Down
5 changes: 4 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ rustls-native-certs = { version = "0.1", optional = true }
tokio = { version = "0.2", features = ["rt-core", "macros"] }
static_assertions = "1.0"
rand = "0.6"
criterion = "0.3"
bencher = "0.1.5"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[bench]]
name = "decode"
harness = false

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
159 changes: 159 additions & 0 deletions tonic/benches/decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
extern crate bencher;

use std::fmt::{Error, Formatter};
use std::{
pin::Pin,
task::{Context, Poll},
};

use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::Body;
use tokio_util::codec::Decoder;
use tonic::{Status, Streaming};

macro_rules! bench {
($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
fn $name(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.expect("runtime");

let payload = make_payload($message_size, $message_count);
let body = MockBody::new(payload, $chunk_size);
b.bytes = body.len() as u64;

b.iter(|| {
rt.block_on(async {
let decoder = MockDecoder::new($message_size);
let mut stream = Streaming::new_request(decoder, body.clone());

let mut count = 0;
while let Some(msg) = stream.message().await.unwrap() {
assert_eq!($message_size, msg.len());
count += 1;
}

assert_eq!(count, $message_count);
assert!(stream.trailers().await.unwrap().is_none());
})
})
}
};
}

#[derive(Clone)]
struct MockBody {
data: Bytes,
chunk_size: usize,
}

impl MockBody {
pub fn new(data: Bytes, chunk_size: usize) -> Self {
MockBody { data, chunk_size }
}

pub fn len(&self) -> usize {
self.data.len()
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Status;

fn poll_data(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
if self.data.has_remaining() {
let split = std::cmp::min(self.chunk_size, self.data.remaining());
Poll::Ready(Some(Ok(self.data.split_to(split))))
} else {
Poll::Ready(None)
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

impl std::fmt::Debug for MockBody {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let sample = self.data.iter().take(10).collect::<Vec<_>>();
write!(f, "{:?}...({})", sample, self.data.len())
}
}

#[derive(Debug, Clone)]
struct MockDecoder {
message_size: usize,
}

impl MockDecoder {
fn new(message_size: usize) -> Self {
MockDecoder { message_size }
}
}

impl Decoder for MockDecoder {
type Item = Bytes;
type Error = Status;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let item = buf.split_to(self.message_size).freeze();
Ok(Some(item))
}
}

fn make_payload(message_length: usize, message_count: usize) -> Bytes {
let mut buf = BytesMut::new();

for _ in 0..message_count {
let msg = vec![97u8; message_length];
buf.reserve(msg.len() + 5);
buf.put_u8(0);
buf.put_u32(msg.len() as u32);
buf.put(&msg[..]);
}

buf.freeze()
}

// change body chunk size only
bench!(chunk_size_100, 1_000, 100, 1);
bench!(chunk_size_500, 1_000, 500, 1);
bench!(chunk_size_1005, 1_000, 1_005, 1);

// change message size only
bench!(message_size_1k, 1_000, 1_005, 2);
bench!(message_size_5k, 5_000, 1_005, 2);
bench!(message_size_10k, 10_000, 1_005, 2);

// change message count only
bench!(message_count_1, 500, 505, 1);
bench!(message_count_10, 500, 505, 10);
bench!(message_count_20, 500, 505, 20);

benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);

benchmark_group!(
message_size,
message_size_1k,
message_size_5k,
message_size_10k
);

benchmark_group!(
message_count,
message_count_1,
message_count_10,
message_count_20
);

benchmark_main!(chunk_size, message_size, message_count);
3 changes: 2 additions & 1 deletion tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ impl<T> Streaming<T> {
Self::new(decoder, body, Direction::EmptyResponse)
}

pub(crate) fn new_request<B, D>(decoder: D, body: B) -> Self
#[doc(hidden)]
pub fn new_request<B, D>(decoder: D, body: B) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
Expand Down

0 comments on commit 49ce265

Please sign in to comment.