Skip to content

Commit

Permalink
Add Client stats
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored Sep 12, 2024
1 parent 6607667 commit 9190ca5
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ create_consumer_strict_on_stream
leafnodes
get_stream
get_stream_no_info
lifecycle
AtomicU64
40 changes: 40 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Client {
inbox_prefix: Arc<str>,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
connection_stats: Arc<Statistics>,
}

impl Sink<PublishMessage> for Client {
Expand All @@ -108,6 +109,7 @@ impl Sink<PublishMessage> for Client {
}

impl Client {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
info: tokio::sync::watch::Receiver<ServerInfo>,
state: tokio::sync::watch::Receiver<State>,
Expand All @@ -116,6 +118,7 @@ impl Client {
inbox_prefix: String,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
statistics: Arc<Statistics>,
) -> Client {
let poll_sender = PollSender::new(sender.clone());
Client {
Expand All @@ -128,6 +131,7 @@ impl Client {
inbox_prefix: inbox_prefix.into(),
request_timeout,
max_payload,
connection_stats: statistics,
}
}

Expand Down Expand Up @@ -649,6 +653,26 @@ impl Client {
.await
.map_err(Into::into)
}

/// Returns struct representing statistics of the whole lifecycle of the client.
/// This includes number of bytes sent/received, number of messages sent/received,
/// and number of times the connection was established.
/// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
/// across threads.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let statistics = client.statistics();
/// println!("client statistics: {:#?}", statistics);
/// # Ok(())
/// # }
/// ```
pub fn statistics(&self) -> Arc<Statistics> {
self.connection_stats.clone()
}
}

/// Used for building customized requests.
Expand Down Expand Up @@ -826,3 +850,19 @@ impl Display for FlushErrorKind {
}

pub type FlushError = Error<FlushErrorKind>;

/// Represents statistics for the instance of the client throughout its lifecycle.
#[derive(Default, Debug)]
pub struct Statistics {
/// Number of bytes received. This does not include the protocol overhead.
pub in_bytes: AtomicU64,
/// Number of bytes sent. This doe not include the protocol overhead.
pub out_bytes: AtomicU64,
/// Number of messages received.
pub in_messages: AtomicU64,
/// Number of messages sent.
pub out_messages: AtomicU64,
/// Number of times connection was established.
/// Initial connect will be counted as well, then all successful reconnects.
pub connects: AtomicU64,
}
10 changes: 9 additions & 1 deletion async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use crate::auth::Auth;
use crate::client::Statistics;
use crate::connection::Connection;
use crate::connection::State;
use crate::options::CallbackArg1;
Expand Down Expand Up @@ -40,6 +41,7 @@ use std::cmp;
use std::io;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -70,6 +72,7 @@ pub(crate) struct Connector {
/// A map of servers and number of connect attempts.
servers: Vec<(ServerAddr, usize)>,
options: ConnectorOptions,
pub(crate) connect_stats: Arc<Statistics>,
attempts: usize,
pub(crate) events_tx: tokio::sync::mpsc::Sender<Event>,
pub(crate) state_tx: tokio::sync::watch::Sender<State>,
Expand All @@ -93,6 +96,7 @@ impl Connector {
events_tx: tokio::sync::mpsc::Sender<Event>,
state_tx: tokio::sync::watch::Sender<State>,
max_payload: Arc<AtomicUsize>,
connect_stats: Arc<Statistics>,
) -> Result<Connector, io::Error> {
let servers = addrs.to_server_addrs()?.map(|addr| (addr, 0)).collect();

Expand All @@ -103,13 +107,16 @@ impl Connector {
events_tx,
state_tx,
max_payload,
connect_stats,
})
}

pub(crate) async fn connect(&mut self) -> Result<(ServerInfo, Connection), ConnectError> {
loop {
match self.try_connect().await {
Ok(inner) => return Ok(inner),
Ok(inner) => {
return Ok(inner);
}
Err(error) => match error.kind() {
ConnectErrorKind::MaxReconnects => {
return Err(ConnectError::with_source(
Expand Down Expand Up @@ -284,6 +291,7 @@ impl Connector {
Some(_) => {
tracing::debug!("connected to {}", server_info.port);
self.attempts = 0;
self.connect_stats.connects.add(1, Ordering::Relaxed);
self.events_tx.send(Event::Connected).await.ok();
self.state_tx.send(State::Connected).ok();
self.max_payload.store(
Expand Down
4 changes: 4 additions & 0 deletions async-nats/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl HeaderMap {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn len(&self) -> usize {
self.inner.len()
}
}

impl HeaderMap {
Expand Down
52 changes: 50 additions & 2 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ use std::pin::Pin;
use std::slice;
use std::str::{self, FromStr};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::ErrorKind;
Expand Down Expand Up @@ -251,7 +252,9 @@ mod connector;
mod options;

pub use auth::Auth;
pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError};
pub use client::{
Client, PublishError, Request, RequestError, RequestErrorKind, Statistics, SubscribeError,
};
pub use options::{AuthError, ConnectOptions};

mod crypto;
Expand Down Expand Up @@ -667,6 +670,15 @@ impl ConnectionHandler {
description,
length,
} => {
self.connector
.connect_stats
.in_messages
.add(1, Ordering::Relaxed);
self.connector
.connect_stats
.in_bytes
.add(length as u64, Ordering::Relaxed);

if let Some(subscription) = self.subscriptions.get_mut(&sid) {
let message: Message = Message {
subject,
Expand Down Expand Up @@ -796,6 +808,11 @@ impl ConnectionHandler {
} => {
let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject");

let header_len = headers
.as_ref()
.map(|headers| headers.len())
.unwrap_or_default();

let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
multiplexer
} else {
Expand All @@ -814,13 +831,23 @@ impl ConnectionHandler {
senders: HashMap::new(),
})
};
self.connector
.connect_stats
.out_messages
.add(1, Ordering::Relaxed);

multiplexer.senders.insert(token.to_owned(), sender);

let respond: Subject = format!("{}{}", multiplexer.prefix, token).into();

self.connector.connect_stats.out_bytes.add(
(payload.len() + respond.len() + subject.len() + header_len) as u64,
Ordering::Relaxed,
);
let pub_op = ClientOp::Publish {
subject,
payload,
respond: Some(format!("{}{}", multiplexer.prefix, token).into()),
respond: Some(respond),
headers,
};

Expand All @@ -833,6 +860,24 @@ impl ConnectionHandler {
reply: respond,
headers,
}) => {
self.connector
.connect_stats
.out_messages
.add(1, Ordering::Relaxed);

let header_len = headers
.as_ref()
.map(|headers| headers.len())
.unwrap_or_default();

self.connector.connect_stats.out_bytes.add(
(payload.len()
+ respond.as_ref().map_or_else(|| 0, |r| r.len())
+ subject.len()
+ header_len) as u64,
Ordering::Relaxed,
);

self.connection.enqueue_write_op(&ClientOp::Publish {
subject,
payload,
Expand Down Expand Up @@ -907,6 +952,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending);
// We're setting it to the default server payload size.
let max_payload = Arc::new(AtomicUsize::new(1024 * 1024));
let statistics = Arc::new(Statistics::default());

let mut connector = Connector::new(
addrs,
Expand All @@ -931,6 +977,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
events_tx,
state_tx,
max_payload.clone(),
statistics.clone(),
)
.map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?;

Expand All @@ -954,6 +1001,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
options.inbox_prefix,
options.request_timeout,
max_payload,
statistics,
);

task::spawn(async move {
Expand Down
65 changes: 65 additions & 0 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod client {
use futures::stream::StreamExt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::Duration;

#[tokio::test]
Expand Down Expand Up @@ -931,4 +932,68 @@ mod client {
.await
.unwrap();
}

#[tokio::test]
async fn client_statistics() {
let server = nats_server::run_basic_server();

let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let client = async_nats::ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::Connected = event {
tx.send(()).await.unwrap();
}
}
})
.connect(server.client_url())
.await
.unwrap();

tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let stats = client.statistics();

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 1);

let mut responder = client.subscribe("request").await.unwrap();
tokio::task::spawn({
let client = client.clone();
async move {
let msg = responder.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "response".into())
.await
.unwrap();
}
});
client.request("request", "data".into()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
sub.next().await.unwrap();
sub.next().await.unwrap();

client.flush().await.unwrap();
client.force_reconnect().await.unwrap();

tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139);
assert_eq!(stats.connects.load(Ordering::Relaxed), 2);
}
}

0 comments on commit 9190ca5

Please sign in to comment.