diff --git a/Cargo.lock b/Cargo.lock index 208562669b9..dc3848081b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -773,6 +773,7 @@ dependencies = [ "futures 0.3.5", "genesis", "http_api", + "http_metrics", "lazy_static", "lighthouse_metrics", "network", @@ -2337,6 +2338,28 @@ dependencies = [ "types", ] +[[package]] +name = "http_metrics" +version = "0.1.0" +dependencies = [ + "beacon_chain", + "environment", + "eth2_libp2p", + "lazy_static", + "lighthouse_metrics", + "procinfo", + "prometheus", + "psutil", + "reqwest", + "serde", + "slog", + "slot_clock", + "store", + "tokio 0.2.22", + "types", + "warp", +] + [[package]] name = "httparse" version = "1.3.4" diff --git a/Cargo.toml b/Cargo.toml index 69ab6cc32ac..937a2a8d7a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "beacon_node/eth1", "beacon_node/eth2_libp2p", "beacon_node/http_api", + "beacon_node/http_metrics", "beacon_node/network", "beacon_node/rest_api", "beacon_node/store", diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 80513af76a3..3b5ce51658c 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -41,3 +41,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } time = "0.2.16" bus = "2.2.3" http_api = { path = "../http_api" } +http_metrics = { path = "../http_metrics" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f9dd06aea69..1fccad6129a 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -17,10 +17,10 @@ use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; use parking_lot::Mutex; -use slog::info; +use slog::{debug, info}; use ssz::Decode; use std::net::SocketAddr; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use timer::spawn_timer; @@ -60,7 +60,10 @@ pub struct ClientBuilder { event_handler: Option, network_globals: Option>>, network_send: Option>>, + db_path: Option, + freezer_db_path: Option, http_api_config: http_api::Config, + http_metrics_config: http_metrics::Config, websocket_listen_addr: Option, eth_spec_instance: T::EthSpec, } @@ -102,7 +105,10 @@ where event_handler: None, network_globals: None, network_send: None, + db_path: None, + freezer_db_path: None, http_api_config: <_>::default(), + http_metrics_config: <_>::default(), websocket_listen_addr: None, eth_spec_instance, } @@ -285,6 +291,12 @@ where self } + /// Provides configuration for the HTTP server that serves Prometheus metrics. + pub fn http_metrics_config(mut self, config: http_metrics::Config) -> Self { + self.http_metrics_config = config; + self + } + /// Immediately starts the service that periodically logs information each slot. pub fn notifier(self) -> Result { let context = self @@ -354,6 +366,7 @@ where .unwrap(); // TODO self.runtime_context + .clone() .unwrap() .executor .spawn_without_exit(async move { server.await }, "http-api"); @@ -364,10 +377,38 @@ where None }; + let http_metrics_listen_addr = if self.http_metrics_config.enabled { + let ctx = Arc::new(http_metrics::Context { + config: self.http_metrics_config.clone(), + chain: self.beacon_chain.clone(), + db_path: self.db_path.clone(), + freezer_db_path: self.freezer_db_path.clone(), + log: log.clone(), + }); + + // TODO + let exit = self.runtime_context.as_ref().unwrap().executor.exit(); + + let (listen_addr, server) = http_metrics::serve(ctx, exit) + .map_err(|e| format!("Unable to start HTTP API server: {:?}", e)) + .unwrap(); // TODO + + self.runtime_context + .unwrap() + .executor + .spawn_without_exit(async move { server.await }, "http-api"); + + Some(listen_addr) + } else { + debug!(log, "Metrics server is disabled"); + None + }; + Client { beacon_chain: self.beacon_chain, network_globals: self.network_globals, http_api_listen_addr, + http_metrics_listen_addr, websocket_listen_addr: self.websocket_listen_addr, } } @@ -504,6 +545,9 @@ where .clone() .ok_or_else(|| "disk_store requires a chain spec".to_string())?; + self.db_path = Some(hot_path.into()); + self.freezer_db_path = Some(cold_path.into()); + let store = HotColdDB::open(hot_path, cold_path, config, spec, context.log().clone()) .map_err(|e| format!("Unable to open database: {:?}", e))?; self.store = Some(Arc::new(store)); diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index aa712951cb3..a2975129f1e 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -67,6 +67,7 @@ pub struct Config { pub websocket_server: websocket_server::Config, pub eth1: eth1::Config, pub http_api: http_api::Config, + pub http_metrics: http_metrics::Config, } impl Default for Config { @@ -88,6 +89,7 @@ impl Default for Config { disabled_forks: Vec::new(), graffiti: Graffiti::default(), http_api: <_>::default(), + http_metrics: <_>::default(), } } } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index f460f2c5568..6b721aee924 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -25,6 +25,8 @@ pub struct Client { network_globals: Option>>, /// Listen address for the standard eth2.0 API, if the service was started. http_api_listen_addr: Option, + /// Listen address for the HTTP server which serves Prometheus metrics. + http_metrics_listen_addr: Option, websocket_listen_addr: Option, } @@ -39,6 +41,11 @@ impl Client { self.http_api_listen_addr } + /// Returns the address of the client's HTTP Prometheus metrics server, if it was started. + pub fn http_metrics_listen_addr(&self) -> Option { + self.http_metrics_listen_addr + } + /// Returns the address of the client's WebSocket API server, if it was started. pub fn websocket_listen_addr(&self) -> Option { self.websocket_listen_addr diff --git a/beacon_node/http_metrics/Cargo.toml b/beacon_node/http_metrics/Cargo.toml new file mode 100644 index 00000000000..8dc7f969d24 --- /dev/null +++ b/beacon_node/http_metrics/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "http_metrics" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +prometheus = "0.9.0" +warp = "0.2.5" +serde = { version = "1.0.110", features = ["derive"] } +slog = "2.5.2" +beacon_chain = { path = "../beacon_chain" } +store = { path = "../store" } +eth2_libp2p = { path = "../eth2_libp2p" } +slot_clock = { path = "../../common/slot_clock" } +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +lazy_static = "1.4.0" + +[dev-dependencies] +tokio = { version = "0.2.21", features = ["sync"] } +reqwest = { version = "0.10.8", features = ["json"] } +environment = { path = "../../lighthouse/environment" } +types = { path = "../../consensus/types" } + +[target.'cfg(target_os = "linux")'.dependencies] +psutil = "3.1.0" +procinfo = "0.4.2" diff --git a/beacon_node/http_metrics/src/health.rs b/beacon_node/http_metrics/src/health.rs new file mode 100644 index 00000000000..e5ffde06c42 --- /dev/null +++ b/beacon_node/http_metrics/src/health.rs @@ -0,0 +1,72 @@ +use serde::{Deserialize, Serialize}; + +#[cfg(target_os = "linux")] +use {procinfo::pid, psutil::process::Process}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +/// Reports on the health of the Lighthouse instance. +pub struct Health { + /// The pid of this process. + pub pid: u32, + /// The number of threads used by this pid. + pub pid_num_threads: i32, + /// The total resident memory used by this pid. + pub pid_mem_resident_set_size: u64, + /// The total virtual memory used by this pid. + pub pid_mem_virtual_memory_size: u64, + /// Total virtual memory on the system + pub sys_virt_mem_total: u64, + /// Total virtual memory available for new processes. + pub sys_virt_mem_available: u64, + /// Total virtual memory used on the system + pub sys_virt_mem_used: u64, + /// Total virtual memory not used on the system + pub sys_virt_mem_free: u64, + /// Percentage of virtual memory used on the system + pub sys_virt_mem_percent: f32, + /// System load average over 1 minute. + pub sys_loadavg_1: f64, + /// System load average over 5 minutes. + pub sys_loadavg_5: f64, + /// System load average over 15 minutes. + pub sys_loadavg_15: f64, +} + +impl Health { + #[cfg(not(target_os = "linux"))] + pub fn observe() -> Result { + Err("Health is only available on Linux".into()) + } + + #[cfg(target_os = "linux")] + pub fn observe() -> Result { + let process = + Process::current().map_err(|e| format!("Unable to get current process: {:?}", e))?; + + let process_mem = process + .memory_info() + .map_err(|e| format!("Unable to get process memory info: {:?}", e))?; + + let stat = pid::stat_self().map_err(|e| format!("Unable to get stat: {:?}", e))?; + + let vm = psutil::memory::virtual_memory() + .map_err(|e| format!("Unable to get virtual memory: {:?}", e))?; + let loadavg = + psutil::host::loadavg().map_err(|e| format!("Unable to get loadavg: {:?}", e))?; + + Ok(Self { + pid: process.pid(), + pid_num_threads: stat.num_threads, + pid_mem_resident_set_size: process_mem.rss(), + pid_mem_virtual_memory_size: process_mem.vms(), + sys_virt_mem_total: vm.total(), + sys_virt_mem_available: vm.available(), + sys_virt_mem_used: vm.used(), + sys_virt_mem_free: vm.free(), + sys_virt_mem_percent: vm.percent(), + sys_loadavg_1: loadavg.one, + sys_loadavg_5: loadavg.five, + sys_loadavg_15: loadavg.fifteen, + }) + } +} diff --git a/beacon_node/http_metrics/src/lib.rs b/beacon_node/http_metrics/src/lib.rs new file mode 100644 index 00000000000..8add7ab71ed --- /dev/null +++ b/beacon_node/http_metrics/src/lib.rs @@ -0,0 +1,106 @@ +#[macro_use] +extern crate lazy_static; + +mod health; +mod metrics; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use serde::{Deserialize, Serialize}; +use slog::{crit, info, Logger}; +use std::future::Future; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::path::PathBuf; +use std::sync::Arc; +use warp::{http::Response, Filter}; + +#[derive(Debug)] +pub enum Error { + Warp(warp::Error), + Other(String), +} + +impl From for Error { + fn from(e: warp::Error) -> Self { + Error::Warp(e) + } +} + +impl From for Error { + fn from(e: String) -> Self { + Error::Other(e) + } +} + +pub struct Context { + pub config: Config, + pub chain: Option>>, + pub db_path: Option, + pub freezer_db_path: Option, + pub log: Logger, +} + +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub enabled: bool, + pub listen_addr: Ipv4Addr, + pub listen_port: u16, + pub allow_origin: Option, +} + +impl Default for Config { + fn default() -> Self { + Self { + enabled: false, + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 5054, + allow_origin: None, + } + } +} + +pub fn serve( + ctx: Arc>, + shutdown: impl Future + Send + Sync + 'static, +) -> Result<(SocketAddr, impl Future), Error> { + let config = &ctx.config; + let log = ctx.log.clone(); + + if !config.enabled { + crit!(log, "Cannot start disabled metrics HTTP server"); + return Err(Error::Other( + "A disabled metrics server should not be started".to_string(), + )); + } + + let inner_ctx = ctx.clone(); + let routes = warp::get() + .and(warp::path("metrics")) + .map(move || inner_ctx.clone()) + .and_then(|ctx: Arc>| async move { + Ok::<_, warp::Rejection>( + metrics::gather_prometheus_metrics(&ctx) + .map(|body| Response::builder().status(200).body(body).unwrap()) + .unwrap_or_else(|e| { + Response::builder() + .status(500) + .body(format!("Unable to gather metrics: {:?}", e)) + .unwrap() + }), + ) + }); + + let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( + SocketAddrV4::new(config.listen_addr, config.listen_port), + async { + shutdown.await; + }, + )?; + + info!( + log, + "Metrics HTTP server started"; + "listen_address" => listening_socket.to_string(), + ); + + Ok((listening_socket, server)) +} diff --git a/beacon_node/http_metrics/src/metrics.rs b/beacon_node/http_metrics/src/metrics.rs new file mode 100644 index 00000000000..9e462d1d1b1 --- /dev/null +++ b/beacon_node/http_metrics/src/metrics.rs @@ -0,0 +1,105 @@ +use crate::health::Health; +use crate::Context; +use beacon_chain::BeaconChainTypes; +use lighthouse_metrics::{Encoder, TextEncoder}; + +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref PROCESS_NUM_THREADS: Result = try_create_int_gauge( + "process_num_threads", + "Number of threads used by the current process" + ); + pub static ref PROCESS_RES_MEM: Result = try_create_int_gauge( + "process_resident_memory_bytes", + "Resident memory used by the current process" + ); + pub static ref PROCESS_VIRT_MEM: Result = try_create_int_gauge( + "process_virtual_memory_bytes", + "Virtual memory used by the current process" + ); + pub static ref SYSTEM_VIRT_MEM_TOTAL: Result = + try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result = try_create_int_gauge( + "system_virt_mem_available_bytes", + "Available system virtual memory" + ); + pub static ref SYSTEM_VIRT_MEM_USED: Result = + try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_FREE: Result = + try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result = try_create_float_gauge( + "system_virt_mem_percentage", + "Percentage of used virtual memory" + ); + pub static ref SYSTEM_LOADAVG_1: Result = + try_create_float_gauge("system_loadavg_1", "Loadavg over 1 minute"); + pub static ref SYSTEM_LOADAVG_5: Result = + try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes"); + pub static ref SYSTEM_LOADAVG_15: Result = + try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes"); +} + +pub fn gather_prometheus_metrics( + ctx: &Context, +) -> std::result::Result { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + // There are two categories of metrics: + // + // - Dynamically updated: things like histograms and event counters that are updated on the + // fly. + // - Statically updated: things which are only updated at the time of the scrape (used where we + // can avoid cluttering up code with metrics calls). + // + // The `lighthouse_metrics` crate has a `DEFAULT_REGISTRY` global singleton (via `lazy_static`) + // which keeps the state of all the metrics. Dynamically updated things will already be + // up-to-date in the registry (because they update themselves) however statically updated + // things need to be "scraped". + // + // We proceed by, first updating all the static metrics using `scrape_for_metrics(..)`. Then, + // using `lighthouse_metrics::gather(..)` to collect the global `DEFAULT_REGISTRY` metrics into + // a string that can be returned via HTTP. + + if let Some(beacon_chain) = ctx.chain.as_ref() { + slot_clock::scrape_for_metrics::(&beacon_chain.slot_clock); + beacon_chain::scrape_for_metrics(beacon_chain); + } + + if let (Some(db_path), Some(freezer_db_path)) = + (ctx.db_path.as_ref(), ctx.freezer_db_path.as_ref()) + { + store::scrape_for_metrics(db_path, freezer_db_path); + } + + eth2_libp2p::scrape_discovery_metrics(); + + // This will silently fail if we are unable to observe the health. This is desired behaviour + // since we don't support `Health` for all platforms. + if let Ok(health) = Health::observe() { + set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64); + set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64); + set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64); + set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64); + set_gauge( + &SYSTEM_VIRT_MEM_AVAILABLE, + health.sys_virt_mem_available as i64, + ); + set_gauge(&SYSTEM_VIRT_MEM_USED, health.sys_virt_mem_used as i64); + set_gauge(&SYSTEM_VIRT_MEM_FREE, health.sys_virt_mem_free as i64); + set_float_gauge( + &SYSTEM_VIRT_MEM_PERCENTAGE, + health.sys_virt_mem_percent as f64, + ); + set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1); + set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5); + set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15); + } + + encoder + .encode(&lighthouse_metrics::gather(), &mut buffer) + .unwrap(); + + String::from_utf8(buffer).map_err(|e| format!("Failed to encode prometheus info: {:?}", e)) +} diff --git a/beacon_node/http_metrics/tests/tests.rs b/beacon_node/http_metrics/tests/tests.rs new file mode 100644 index 00000000000..18a40d4f849 --- /dev/null +++ b/beacon_node/http_metrics/tests/tests.rs @@ -0,0 +1,46 @@ +use beacon_chain::test_utils::BlockingMigratorEphemeralHarnessType; +use environment::null_logger; +use http_metrics::Config; +use reqwest::StatusCode; +use std::net::Ipv4Addr; +use std::sync::Arc; +use tokio::sync::oneshot; +use types::MainnetEthSpec; + +type Context = http_metrics::Context>; + +#[tokio::test(core_threads = 2)] +async fn returns_200_ok() { + let log = null_logger().unwrap(); + + let context = Arc::new(Context { + config: Config { + enabled: true, + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 0, + allow_origin: None, + }, + chain: None, + db_path: None, + freezer_db_path: None, + log, + }); + + let ctx = context.clone(); + let (_shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown = async { + // It's not really interesting why this triggered, just that it happened. + let _ = shutdown_rx.await; + }; + let (listening_socket, server) = http_metrics::serve(ctx, server_shutdown).unwrap(); + + tokio::spawn(async { server.await }); + + let url = format!( + "http://{}:{}/metrics", + listening_socket.ip(), + listening_socket.port() + ); + + assert_eq!(reqwest::get(&url).await.unwrap().status(), StatusCode::OK); +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6caa8acd9b2..817f9e702bd 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -163,6 +163,38 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("") .takes_value(true), ) + /* Prometheus metrics HTTP server related arguments */ + .arg( + Arg::with_name("metrics") + .long("metrics") + .help("Enable Prometheus metrics HTTP server. Disabled by default.") + .takes_value(false), + ) + .arg( + Arg::with_name("metrics-address") + .long("metrics-address") + .value_name("ADDRESS") + .help("Set the listen address for the Prometheus metrics HTTP server.") + .default_value("127.0.0.1") + .takes_value(true), + ) + .arg( + Arg::with_name("metrics-port") + .long("metrics-port") + .value_name("PORT") + .help("Set the listen TCP port for the Prometheus metrics HTTP server.") + .default_value("5054") + .takes_value(true), + ) + .arg( + Arg::with_name("metrics-allow-origin") + .long("metrics-allow-origin") + .value_name("ORIGIN") + .help("Set the value of the Access-Control-Allow-Origin response HTTP header for the Prometheus metrics HTTP server. \ + Use * to allow any origin (not recommended in production)") + .default_value("") + .takes_value(true), + ) /* Websocket related arguments */ .arg( Arg::with_name("ws") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index ac2585617f6..073ee3b8326 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -84,7 +84,7 @@ pub fn get_config( )?; /* - * Http server + * Http API server */ if cli_args.is_present("http") { @@ -112,6 +112,35 @@ pub fn get_config( client_config.http_api.allow_origin = Some(allow_origin.to_string()); } + /* + * Prometheus metrics HTTP server + */ + + if cli_args.is_present("metrics") { + client_config.http_metrics.enabled = true; + } + + if let Some(address) = cli_args.value_of("metrics-address") { + client_config.http_metrics.listen_addr = address + .parse::() + .map_err(|_| "metrics-address is not a valid IPv4 address.")?; + } + + if let Some(port) = cli_args.value_of("metrics-port") { + client_config.http_metrics.listen_port = port + .parse::() + .map_err(|_| "metrics-port is not a valid u16.")?; + } + + if let Some(allow_origin) = cli_args.value_of("metrics-allow-origin") { + // Pre-validate the config value to give feedback to the user on node startup, instead of + // as late as when the first API response is produced. + hyper::header::HeaderValue::from_str(allow_origin) + .map_err(|_| "Invalid allow-origin value")?; + + client_config.http_metrics.allow_origin = Some(allow_origin.to_string()); + } + /* * Websocket server */ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 149b61187c7..b00ffd8de79 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -131,6 +131,7 @@ impl ProductionBeaconNode { .await? .notifier()? .http_api_config(client_config.http_api.clone()) + .http_metrics_config(client_config.http_metrics.clone()) .build(), )) }