Skip to content

Commit

Permalink
Improve statistics
Browse files Browse the repository at this point in the history
Client statistics should could all bytes that are coming in and out of
the client, not just part of messages.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored Sep 30, 2024
1 parent 33dbbdc commit e7fba06
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 37 deletions.
54 changes: 35 additions & 19 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::future::{self, Future};
use std::io::IoSlice;
use std::pin::Pin;
use std::str::{self, FromStr};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};

use bytes::{Buf, Bytes, BytesMut};
Expand All @@ -27,7 +29,7 @@ use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite};
use crate::header::{HeaderMap, HeaderName, IntoHeaderValue};
use crate::status::StatusCode;
use crate::subject::Subject;
use crate::{ClientOp, ServerError, ServerOp};
use crate::{ClientOp, ServerError, ServerOp, Statistics};

/// Soft limit for the amount of bytes in [`Connection::write_buf`]
/// and [`Connection::flattened_writes`].
Expand Down Expand Up @@ -80,19 +82,25 @@ pub(crate) struct Connection {
write_buf_len: usize,
flattened_writes: BytesMut,
can_flush: bool,
statistics: Arc<Statistics>,
}

/// Internal representation of the connection.
/// Holds connection with NATS Server and communicates with `Client` via channels.
impl Connection {
pub(crate) fn new(stream: Box<dyn AsyncReadWrite>, read_buffer_capacity: usize) -> Self {
pub(crate) fn new(
stream: Box<dyn AsyncReadWrite>,
read_buffer_capacity: usize,
statistics: Arc<Statistics>,
) -> Self {
Self {
stream,
read_buf: BytesMut::with_capacity(read_buffer_capacity),
write_buf: VecDeque::new(),
write_buf_len: 0,
flattened_writes: BytesMut::new(),
can_flush: false,
statistics,
}
}

Expand Down Expand Up @@ -407,7 +415,10 @@ impl Connection {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(0)) if self.read_buf.is_empty() => Poll::Ready(Ok(None)),
Poll::Ready(Ok(0)) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())),
Poll::Ready(Ok(_n)) => continue,
Poll::Ready(Ok(n)) => {
self.statistics.in_bytes.add(n as u64, Ordering::Relaxed);
continue;
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
};
}
Expand Down Expand Up @@ -544,6 +555,7 @@ impl Connection {
match Pin::new(&mut self.stream).poll_write(cx, buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(n)) => {
self.statistics.out_bytes.add(n as u64, Ordering::Relaxed);
self.write_buf_len -= n;
self.can_flush = true;

Expand All @@ -564,7 +576,6 @@ impl Connection {
}
}
}

/// Write the internal buffers into the write stream using vectored write operations
///
/// Writes [`WRITE_VECTORED_CHUNKS`] at a time. More efficient _if_
Expand Down Expand Up @@ -595,6 +606,7 @@ impl Connection {
match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(mut n)) => {
self.statistics.out_bytes.add(n as u64, Ordering::Relaxed);
self.write_buf_len -= n;
self.can_flush = true;

Expand Down Expand Up @@ -673,14 +685,16 @@ impl Connection {

#[cfg(test)]
mod read_op {
use std::sync::Arc;

use super::Connection;
use crate::{HeaderMap, ServerError, ServerInfo, ServerOp, StatusCode};
use crate::{HeaderMap, ServerError, ServerInfo, ServerOp, Statistics, StatusCode};
use tokio::io::{self, AsyncWriteExt};

#[tokio::test]
async fn ok() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"+OK\r\n").await.unwrap();
let result = connection.read_op().await.unwrap();
Expand All @@ -690,7 +704,7 @@ mod read_op {
#[tokio::test]
async fn ping() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"PING\r\n").await.unwrap();
let result = connection.read_op().await.unwrap();
Expand All @@ -700,7 +714,7 @@ mod read_op {
#[tokio::test]
async fn pong() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"PONG\r\n").await.unwrap();
let result = connection.read_op().await.unwrap();
Expand All @@ -710,7 +724,7 @@ mod read_op {
#[tokio::test]
async fn info() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"INFO {}\r\n").await.unwrap();
server.flush().await.unwrap();
Expand All @@ -737,7 +751,7 @@ mod read_op {
#[tokio::test]
async fn error() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"INFO {}\r\n").await.unwrap();
let result = connection.read_op().await.unwrap();
Expand All @@ -759,7 +773,7 @@ mod read_op {
#[tokio::test]
async fn message() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server
.write_all(b"MSG FOO.BAR 9 11\r\nHello World\r\n")
Expand Down Expand Up @@ -906,7 +920,7 @@ mod read_op {
#[tokio::test]
async fn unknown() {
let (stream, mut server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

server.write_all(b"ONE\r\n").await.unwrap();
connection.read_op().await.unwrap_err();
Expand Down Expand Up @@ -956,14 +970,16 @@ mod read_op {

#[cfg(test)]
mod write_op {
use std::sync::Arc;

use super::Connection;
use crate::{ClientOp, ConnectInfo, HeaderMap, Protocol};
use crate::{ClientOp, ConnectInfo, HeaderMap, Protocol, Statistics};
use tokio::io::{self, AsyncBufReadExt, BufReader};

#[tokio::test]
async fn publish() {
let (stream, server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

connection
.easy_write_and_flush(
Expand Down Expand Up @@ -1032,7 +1048,7 @@ mod write_op {
#[tokio::test]
async fn subscribe() {
let (stream, server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

connection
.easy_write_and_flush(
Expand Down Expand Up @@ -1071,7 +1087,7 @@ mod write_op {
#[tokio::test]
async fn unsubscribe() {
let (stream, server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

connection
.easy_write_and_flush([ClientOp::Unsubscribe { sid: 11, max: None }].iter())
Expand Down Expand Up @@ -1102,7 +1118,7 @@ mod write_op {
#[tokio::test]
async fn ping() {
let (stream, server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

let mut reader = BufReader::new(server);
let mut buffer = String::new();
Expand All @@ -1120,7 +1136,7 @@ mod write_op {
#[tokio::test]
async fn pong() {
let (stream, server) = io::duplex(128);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

let mut reader = BufReader::new(server);
let mut buffer = String::new();
Expand All @@ -1138,7 +1154,7 @@ mod write_op {
#[tokio::test]
async fn connect() {
let (stream, server) = io::duplex(1024);
let mut connection = Connection::new(Box::new(stream), 0);
let mut connection = Connection::new(Box::new(stream), 0, Arc::new(Statistics::default()));

let mut reader = BufReader::new(server);
let mut buffer = String::new();
Expand Down
7 changes: 6 additions & 1 deletion async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ impl Connector {
let mut connection = Connection::new(
Box::new(tcp_stream),
self.options.read_buffer_capacity.into(),
self.connect_stats.clone(),
);

let tls_connection = |connection: Connection| async {
Expand All @@ -352,7 +353,11 @@ impl Connector {
.connect(domain.to_owned(), connection.stream)
.await?;

Ok::<Connection, ConnectError>(Connection::new(Box::new(tls_stream), 0))
Ok::<Connection, ConnectError>(Connection::new(
Box::new(tls_stream),
0,
self.connect_stats.clone(),
))
};

// If `tls_first` was set, establish TLS connection before getting INFO.
Expand Down
13 changes: 0 additions & 13 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,6 @@ impl ConnectionHandler {
.connect_stats
.in_messages
.add(1, Ordering::Relaxed);
self.connector
.connect_stats
.in_bytes
.add(length as u64, Ordering::Relaxed);

if let Some(subscription) = self.subscriptions.get_mut(&sid) {
let message: Message = Message {
Expand Down Expand Up @@ -808,11 +804,6 @@ impl ConnectionHandler {
} => {
let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject");

let header_len = headers
.as_ref()
.map(|headers| headers.len())
.unwrap_or_default();

let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
multiplexer
} else {
Expand Down Expand Up @@ -840,10 +831,6 @@ impl ConnectionHandler {

let respond: Subject = format!("{}{}", multiplexer.prefix, token).into();

self.connector.connect_stats.out_bytes.add(
(payload.len() + respond.len() + subject.len() + header_len) as u64,
Ordering::Relaxed,
);
let pub_op = ClientOp::Publish {
subject,
payload,
Expand Down
8 changes: 4 additions & 4 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,8 @@ mod client {

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0);
assert!(stats.in_bytes.load(Ordering::Relaxed) != 0);
assert!(stats.out_bytes.load(Ordering::Relaxed) != 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 1);

let mut responder = client.subscribe("request").await.unwrap();
Expand Down Expand Up @@ -992,8 +992,8 @@ mod client {

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139);
assert!(stats.in_bytes.load(Ordering::Relaxed) != 0);
assert!(stats.out_bytes.load(Ordering::Relaxed) != 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 2);
}
}

0 comments on commit e7fba06

Please sign in to comment.