diff --git a/.gitignore b/.gitignore index 6985cf1b..81861b62 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +.env +.config.toml \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..ceca3f3e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "da-commit-dockerized"] + path = da-commit-dockerized + url = https://github.com/LimeChain/da-commit-dockerized.git diff --git a/Cargo.toml b/Cargo.toml index 6bcc0a13..52bc4e0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,10 @@ members = [ "crates/common", "crates/crypto", "crates/pbs", + "crates/metrics", "tests", - "examples", + "examples" ] resolver = "2" @@ -22,6 +23,7 @@ cb-cli = { path = "crates/cli" } cb-common = { path = "crates/common" } cb-crypto = { path = "crates/crypto" } cb-pbs = { path = "crates/pbs" } +cb-metrics = { path = "crates/metrics" } # ethereum ethereum-consensus = { git = "https://github.com/ralexstokes/ethereum-consensus", rev = "cf3c404043230559660810bc0c9d6d5a8498d819" } diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..389ac08c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,44 @@ +# Start from the latest Rust image for the build stage +FROM rust:latest AS builder + +# Set the working directory +WORKDIR /usr/src/app + +# Copy the Cargo.toml and Cargo.lock files +COPY ./Cargo.toml ./Cargo.toml +COPY ./Cargo.lock ./Cargo.lock + +# Copy the source code +COPY ./bin ./bin +COPY ./crates ./crates +COPY ./tests ./tests +COPY ./examples ./examples + +# Build the application +RUN cargo build --bin commit-boost + +# Use Ubuntu 22.04 for runtime to ensure OpenSSL 3.x is available +FROM ubuntu:22.04 + +# Install OpenSSL and necessary libraries +RUN apt-get update && apt-get install -y \ + openssl \ + ca-certificates \ + libssl3 \ + && rm -rf /var/lib/apt/lists/* + +# Copy the built binary from the builder stage +COPY --from=builder /usr/src/app/target/debug/commit-boost /usr/local/bin/commit-boost + +# Copy the configuration file +COPY ./config.dockerized.toml /etc/commit-boost/config.toml +COPY ./keys.example.json ./keys.example.json +COPY ./metrics_jwt.txt ./metrics_jwt.txt + +# Expose the necessary ports for metrics +EXPOSE 13030 +EXPOSE 18551 +EXPOSE 33951 + +# Set the entrypoint with the 'start' subcommand and the correct config path +ENTRYPOINT ["commit-boost", "start", "/etc/commit-boost/config.toml"] diff --git a/Dockerfile.prometheus b/Dockerfile.prometheus new file mode 100644 index 00000000..58629990 --- /dev/null +++ b/Dockerfile.prometheus @@ -0,0 +1,13 @@ +FROM prom/prometheus:latest + +# Copy the Prometheus configuration file +COPY prometheus.yml /etc/prometheus/prometheus.yml + +# Expose the port for Prometheus +EXPOSE 9090 + +# Set the entrypoint +ENTRYPOINT ["/bin/prometheus"] + +# Set the command to use the custom configuration file +CMD ["--config.file=/etc/prometheus/prometheus.yml"] diff --git a/README.md b/README.md index 5d3110b6..8f673f0c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,31 @@ # Commit-Boost + +## Dependencies + +- Docker Desktop v4.29 or later. We are using the `host` network driver on docker container instantiation, so our client needs to support the feature. + +## Usage +Note: Ensure that the ports used by the docker-compose are set up in the config.toml are not already in use by other processes and are not blocked by a firewall + +First, initialize the submodule with: +```bash +git submodule update --init --recursive +``` + +Then, build and run the grafana and prometheus docker containers with: +```bash +docker-compose build +docker-compose up -d +``` + +Finally, run the project with: +```bash +cargo run --bin commit-boost -- start config.example.toml +``` + +## Configurations +All configurable settings can be found in config.example.toml file. + ### Note - THIS IS JUST A PROOF OF CONCEPT TO ILLUSTRATE DESIGN PRINCIPLES, THERE ARE STILL COUNTLESS DECISIONS AND ASSUMPTION ON THE DESIGN THAT NEED TO BE MADE - The code is unaudited and are from being / NOT ready for production diff --git a/config.dockerized.toml b/config.dockerized.toml new file mode 100644 index 00000000..e3218c63 --- /dev/null +++ b/config.dockerized.toml @@ -0,0 +1,31 @@ +chain = "Holesky" + +[pbs] +# path = "target/debug/custom_boost" +address = "commit-boost:18551" +relays = [] +relay_check = true +timeout_get_header_ms = 950 +timeout_get_payload_ms = 4000 +timeout_register_validator_ms = 3000 +skip_sigverify = true +min_bid_eth = 0.0 + +[signer] +address = "commit-boost:33951" +[signer.loader] +key_path = "keys.example.json" + +[metrics] +address = "commit-boost:13030" +jwt_path = "metrics_jwt.txt" + +# [[modules]] +# id = "DA_COMMIT" +# path = "target/debug/da_commit" +# sleep_secs = 5 + +[[modules]] +id = "DA_COMMIT" +docker_image="da_commit" +sleep_secs = 5 \ No newline at end of file diff --git a/config.example.toml b/config.example.toml index 3dad10b9..38bb3451 100644 --- a/config.example.toml +++ b/config.example.toml @@ -2,7 +2,7 @@ chain = "Holesky" [pbs] # path = "target/debug/custom_boost" -address = "127.0.0.1:18550" +address = "127.0.0.1:18551" relays = [] relay_check = true timeout_get_header_ms = 950 @@ -12,10 +12,18 @@ skip_sigverify = true min_bid_eth = 0.0 [signer] -address = "127.0.0.1:33950" +address = "127.0.0.1:33951" [signer.loader] key_path = "keys.example.json" +[metrics] +address = "127.0.0.1:13030" + +# [[modules]] +# id = "DA_COMMIT" +# path = "target/debug/da_commit" +# sleep_secs = 5 + [[modules]] id = "DA_COMMIT" docker_image="da_commit" diff --git a/config.toml b/config.toml new file mode 100644 index 00000000..d0ec32a9 --- /dev/null +++ b/config.toml @@ -0,0 +1,31 @@ +chain = "Holesky" + +[pbs] +# path = "target/debug/custom_boost" +address = "127.0.0.1:18550" +relays = [] +relay_check = true +timeout_get_header_ms = 950 +timeout_get_payload_ms = 4000 +timeout_register_validator_ms = 3000 +skip_sigverify = true +min_bid_eth = 0.0 + +[signer] +address = "127.0.0.1:33950" +[signer.loader] +key_path = "keys.example.json" + +[metrics] +address = "127.0.0.1:3030" +jwt_path = "metrics_jwt.txt" + +# [[modules]] +# id = "DA_COMMIT" +# path = "target/debug/da_commit" +# sleep_secs = 5 + +[[modules]] +id = "DA_COMMIT" +docker_image="da_commit" +sleep_secs = 5 \ No newline at end of file diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index c407fa6f..30e2e58e 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -8,6 +8,7 @@ rust-version.workspace = true cb-common.workspace = true cb-pbs.workspace = true cb-crypto.workspace = true +cb-metrics.workspace = true alloy-primitives.workspace = true alloy-rpc-types-beacon.workspace = true diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index 292d3390..76d53a66 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -1,60 +1,24 @@ -use std::{collections::HashMap, process::Stdio}; +use std::{collections::HashMap, iter, process::Stdio}; use cb_common::{ - config::{CommitBoostConfig, CONFIG_PATH_ENV, MODULE_ID_ENV}, - utils::print_logo, + config::{CommitBoostConfig, CONFIG_PATH_ENV, JWT_ENV, METRICS_SERVER_URL, MODULE_ID_ENV}, pbs::DEFAULT_PBS_JWT_KEY, utils::print_logo }; use cb_crypto::service::SigningService; use cb_pbs::{BuilderState, DefaultBuilderApi, PbsService}; use clap::{Parser, Subcommand}; +use cb_metrics::docker_metrics_collector::DockerMetricsCollector; +use std::iter::Iterator; + #[derive(Parser, Debug)] #[command(version, about)] pub struct Args { #[command(subcommand)] - pub cmd: Command, - // /// Start with Holesky spec - // #[arg(long, global = true)] - // pub holesky: bool, + pub cmd: Command } #[derive(Debug, Subcommand)] pub enum Command { - // /// Start pbs module, signing server and commit modules - // Start { - // /// Address to start for boost server on - // #[arg(short, long, default_value = "127.0.0.1:18550", env = "BOOST_LISTEN_ADDR")] - // pbs_address: SocketAddr, - // /// Add a single relay (can be repeated or comma separated). Format is - // scheme://pubkey@host #[arg(short, long, visible_alias = "relays", env = "RELAYS", - // num_args = 1.., required = true, value_delimiter = ',')] relay: Vec, - // #[arg(long)] - // pbs: Option, - // /// Check relay status on startup and getStatus calls - // #[arg(long, env = "RELAY_STARTUP_CHECK")] - // relay_check: bool, - // /// Timeout in ms for calling getHeader to relays - // #[arg(long, default_value_t = 950, env = "RELAY_TIMEOUT_MS_GETHEADER")] - // timeout_get_header_ms: u64, - // /// Timeout in ms for calling getPayload to relays - // #[arg(long, default_value_t = 4000, env = "RELAY_TIMEOUT_MS_GETPAYLOAD")] - // timeout_get_payload_ms: u64, - // /// Timeout in ms for calling registerValidator to relays - // #[arg(long, default_value_t = 3000, env = "RELAY_TIMEOUT_MS_REGVAL")] - // timeout_register_validator_ms: u64, - // /// Skip signature verification for relay headers - // #[arg(long)] - // skip_sigverify: bool, - // /// Minimum bid to accept from relays in ETH - // #[arg(long, default_value_t = 0.0, env = "MIN_BID_ETH")] - // min_bid_eth: f64, - // /// Address where to start the service on - // #[arg(long, default_value = "127.0.0.1:33950", env = SIGNER_LISTEN_ADDR)] - // sign_address: SocketAddr, - // /// Path to executable - // #[arg(short, long, num_args = 1.., required = true, value_delimiter = ',')] - // module: Vec, - // }, Start { /// Path to config file config: String, @@ -67,53 +31,78 @@ impl Args { match self.cmd { Command::Start { config: config_path } => { + let config = CommitBoostConfig::from_file(&config_path); let signer_config = config.signer.expect("missing signer config with modules"); + let metrics_config = config.metrics.clone().expect("missing metrics config"); - // start signing server - // TODO: generate jwt for each module id + // TODO: Actually generate this token let pbs_jwt = "MY_PBS_TOKEN"; - let jwts = HashMap::from([("PBS_DEFAULT".into(), pbs_jwt.into())]); - let signer_address = signer_config.address; + const MODULE_JWT: &str = "JWT_FIXME"; // Initialize Docker client let docker = bollard::Docker::connect_with_local_defaults() .expect("Failed to connect to Docker"); if let Some(modules) = config.modules { + let jwts: HashMap = iter::once((DEFAULT_PBS_JWT_KEY.into(), pbs_jwt.into())) + .chain(modules.iter().map(|module| + // TODO: Generate token instead of hard-coding it. Think about persisting it across the project. + ( + module.id.clone(), + MODULE_JWT.into() + // format!("JWT_{}", module.id) + ))) + .collect(); + // start signing server - tokio::spawn(SigningService::run(config.chain, signer_config, jwts)); + tokio::spawn(SigningService::run(config.chain, signer_config.clone(), jwts.clone())); for module in modules { - let config = bollard::container::Config { + let container_config = bollard::container::Config { image: Some(module.docker_image.clone()), host_config: Some(bollard::secret::HostConfig { binds: { - let full_config_path = std::fs::canonicalize(&config_path) - .unwrap() - .to_string_lossy() - .to_string(); - Some(vec![format!("{}:{}", full_config_path, "/config.toml")]) + let full_config_path = std::fs::canonicalize(&config_path).unwrap().to_string_lossy().to_string(); + Some(vec![ + format!("{}:{}", full_config_path, "/config.toml"), + ]) }, network_mode: Some(String::from("host")), // Use the host network ..Default::default() }), - env: Some(vec![ - format!("{}={}", MODULE_ID_ENV, module.id), - format!("{}={}", CONFIG_PATH_ENV, "/config.toml"), - ]), + env: { + + let metrics_server_url = metrics_config.address; + + Some(vec![ + format!("{}={}", MODULE_ID_ENV, module.id), + format!("{}={}", CONFIG_PATH_ENV, "/config.toml"), + format!("{}={}", JWT_ENV, jwts.get(&module.id).unwrap()), + format!("{}={}", METRICS_SERVER_URL, metrics_server_url), + ]) + }, ..Default::default() }; - let container = - docker.create_container::<&str, String>(None, config).await?; + let container = docker.create_container::<&str, String>(None, container_config).await?; let container_id = container.id; - docker.start_container::(&container_id, None).await?; - println!( - "Started container: {} from image {}", - container_id, module.docker_image - ); + // start monitoring tasks for spawned modules + let metrics_config = metrics_config.clone(); + let cid = container_id.clone(); + tokio::spawn(async move { + DockerMetricsCollector::new( + vec![ + cid + ], + metrics_config.address.clone(), + // FIXME: The entire DockerMetricsCollector currently works with a single JWT; need to migrate to per-module JWT. + MODULE_JWT.to_string()).await + }); + + docker.start_container::(&container_id, None).await?; + println!("Started container: {} from image {}", container_id, module.docker_image); } } @@ -121,6 +110,7 @@ impl Args { if let Some(pbs_path) = config.pbs.path { let cmd = std::process::Command::new(pbs_path) .env(CONFIG_PATH_ENV, &config_path) + .env(JWT_ENV, pbs_jwt) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()) .output() @@ -130,6 +120,7 @@ impl Args { eprintln!("Process failed with status: {}", cmd.status); } } else { + let signer_address = signer_config.address; let state = BuilderState::<()>::new(config.chain, config.pbs, signer_address, pbs_jwt); PbsService::run::<(), DefaultBuilderApi>(state).await; @@ -139,14 +130,4 @@ impl Args { Ok(()) } -} - -// fn deser_relay_vec(relays: Vec) -> Vec { -// relays -// .into_iter() -// .map(|s| { -// serde_json::from_str::(&format!("\"{}\"", s.trim())) -// .expect("invalid relay format, should be scheme://pubkey@host") -// }) -// .collect() -// } +} \ No newline at end of file diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 010fd39e..06c37c5d 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -11,6 +11,8 @@ use crate::{pbs::RelayEntry, signer::Signer, types::Chain}; pub const CONFIG_PATH_ENV: &str = "COMMIT_BOOST_CONFIG"; pub const MODULE_ID_ENV: &str = "COMMIT_BOOST_MODULE_ID"; +pub const JWT_ENV: &str = "JWT_TOKEN"; +pub const METRICS_SERVER_URL: &str = "METRICS_SERVER_URL"; #[derive(Debug, Deserialize, Serialize)] pub struct CommitBoostConfig { @@ -18,6 +20,7 @@ pub struct CommitBoostConfig { pub pbs: BuilderConfig, pub modules: Option>, pub signer: Option, + pub metrics: Option, } fn load_from_file(path: &str) -> T { @@ -42,7 +45,7 @@ impl CommitBoostConfig { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct SignerConfig { /// Where to start signing server pub address: SocketAddr, @@ -51,7 +54,13 @@ pub struct SignerConfig { pub loader: SignerLoader, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct MetricsConfig { + /// Where to start metrics server + pub address: SocketAddr, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(untagged)] pub enum SignerLoader { /// Plain text, do not use in prod @@ -148,6 +157,7 @@ pub fn load_module_config() -> StartModuleConfig { struct StubConfig { chain: Chain, signer: SignerConfig, + modules: Vec>, } diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index 76ce4d94..389b7135 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -12,3 +12,5 @@ pub const HEADER_KEY_VERSION: &str = "X-MEVBoost-Version"; // do we need to use pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS"; pub const BUILDER_EVENTS_PATH: &str = "/events"; + +pub const DEFAULT_PBS_JWT_KEY: &str = "DEFAULT_PBS"; \ No newline at end of file diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml new file mode 100644 index 00000000..2b5eedca --- /dev/null +++ b/crates/metrics/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "cb-metrics" +version.workspace = true +rust-version.workspace = true +edition.workspace = true + +[dependencies] +cb-common.workspace = true + +async-trait = "0.1.51" +bollard = "0.16.1" +futures-util = "0.3.15" +opentelemetry = "0.23.0" +opentelemetry-prometheus = "0.16" +opentelemetry_sdk = "0.23.0" +prometheus = "0.13" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.5.0", features = ["full"] } +warp = "0.3.0" +reqwest = { version = "0.11.0", features = ["json"] } \ No newline at end of file diff --git a/crates/metrics/src/docker_metrics_collector.rs b/crates/metrics/src/docker_metrics_collector.rs new file mode 100644 index 00000000..ed210415 --- /dev/null +++ b/crates/metrics/src/docker_metrics_collector.rs @@ -0,0 +1,208 @@ +use bollard::{container::StatsOptions, Docker}; +use futures_util::stream::TryStreamExt; +use opentelemetry::{global, KeyValue}; +use opentelemetry::metrics::ObservableGauge; +use prometheus::{Encoder, TextEncoder, Registry}; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::task; +use warp::{Filter, Reply}; + +#[derive(Clone)] +pub struct DockerMetricsCollector { + docker: Arc, + container_ids: Vec, + cpu_usage: ObservableGauge, + memory_usage: ObservableGauge, + registry: Registry, + jwt_token: String, +} + +#[derive(Deserialize, Serialize)] +struct RegisterMetricRequest { + name: String, + description: String, +} + +#[derive(Deserialize, Serialize)] +struct UpdateMetricRequest { + name: String, + value: f64, + labels: Vec<(String, String)>, +} + +impl DockerMetricsCollector { + pub async fn new(container_ids: Vec, addr: SocketAddr, jwt_token: String) -> Arc { + let docker = Docker::connect_with_local_defaults().expect("Failed to connect to Docker"); + let registry = Registry::new_custom(Some("docker_metrics".to_string()), None).unwrap(); + // Configure OpenTelemetry to use this registry + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + .expect("failed to build exporter"); + + let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_reader(exporter) + .build(); + + // NOTE: + // This line is crucial, since below we are using global::meter() to create meters (on custom meter registration) + // The current approach here might not be optimal, some deeper understanding of OpenTelemetry's philosophy is needed + global::set_meter_provider(provider.clone()); + + // let _exporter = exporter().with_registry(registry.clone()).init(); + let meter = global::meter("docker_metrics"); + let cpu_usage = meter + .f64_observable_gauge("cpu_usage") + .with_description("CPU Usage") + .init(); + let memory_usage = meter + .u64_observable_gauge("memory_usage") + .with_description("Memory Usage") + .init(); + + let collector = Arc::new(Self { + docker: Arc::new(docker), + container_ids, + cpu_usage, + memory_usage, + registry, + jwt_token, + }); + + let collector_clone = collector.clone(); + let addr = addr.to_string(); // Clone the address to move into the async block + task::spawn(async move { + collector_clone.start_http_server(&addr).await; + }); + + let collector_clone_for_metrics = collector.clone(); + task::spawn(async move { + collector_clone_for_metrics.collect_metrics().await; + }); + + collector + } + + async fn collect_metrics(&self) { + for container_id in &self.container_ids { + self.collect_docker_metrics(container_id.clone()).await; + } + } + + async fn collect_docker_metrics(&self, container_id: String) { + let docker = self.docker.clone(); + let cpu_usage = self.cpu_usage.clone(); + let memory_usage = self.memory_usage.clone(); + + tokio::spawn(async move { + let stats_stream = docker + .stats(&container_id, Some(StatsOptions { stream: true, one_shot: false })) + .map_ok(|stat| { + // //TODO: + // // Those crash since they're really reliant on implicit proper sequence of initialization + // // I've replaced them with a direct 0 to avoid craches, but we must investigate how proper calculations must happen here. + // let cpu_delta = stat.cpu_stats.cpu_usage.total_usage - stat.precpu_stats.cpu_usage.total_usage; + // let system_cpu_delta = stat.cpu_stats.system_cpu_usage.unwrap() - stat.precpu_stats.system_cpu_usage.unwrap_or_default(); + // let number_cpus = stat.cpu_stats.online_cpus.unwrap(); + let cpu_stats = 0f64; //(cpu_delta as f64 / system_cpu_delta as f64) * number_cpus as f64 * 100.0; + let used_memory = stat.memory_stats.usage.unwrap_or_default(); + + cpu_usage.observe(cpu_stats, &[KeyValue::new("container_id", container_id.clone())]); + memory_usage.observe(used_memory, &[KeyValue::new("container_id", container_id.clone())]); + }) + .try_collect::>() + .await; + + if let Err(e) = stats_stream { + eprintln!("Error collecting stats for container {}: {:?}", container_id, e); + } + }); + } + + async fn start_http_server(self: Arc, addr: &str) { + let metrics_route = warp::path("metrics").map({ + let self_clone = self.clone(); + move || { + let encoder = TextEncoder::new(); + let metric_families = self_clone.registry.gather(); + let buffer = encoder.encode_to_string(&metric_families).unwrap(); + warp::http::Response::builder() + .header("Content-Type", encoder.format_type()) + .body(buffer) + } + }); + + let register_metric_route = warp::path("register_custom_metric") + .and(warp::post()) + .and(warp::body::json()) + .and(warp::header("Authorization")) + .and_then({ + let self_clone = self.clone(); + move |req: RegisterMetricRequest, auth: String| { + let self_clone = self_clone.clone(); + async move { self_clone.handle_register_custom_metric(req, auth).await } + } + }); + + let update_metric_route = warp::path("update_custom_metric") + .and(warp::post()) + .and(warp::body::json()) + .and(warp::header("Authorization")) + .and_then({ + let self_clone = self.clone(); + move |req: UpdateMetricRequest, auth: String| { + let self_clone = self_clone.clone(); + async move { self_clone.handle_update_custom_metric(req, auth).await } + } + }); + + let routes = metrics_route + .or(register_metric_route) + .or(update_metric_route); + + let addr: SocketAddr = addr.parse().expect("Invalid address"); + warp::serve(routes).run(addr).await; + } + + fn register_custom_gauge(&self, name: String, description: String) -> ObservableGauge { + let meter = global::meter("custom_metrics"); + meter.f64_observable_gauge(name).with_description(description).init() + } + + fn update_custom_gauge(&self, gauge: &ObservableGauge, value: f64, labels: &[KeyValue]) { + gauge.observe(value, labels); + } + + async fn handle_register_custom_metric(&self, req: RegisterMetricRequest, auth: String) -> Result { + if !self.validate_token(auth) { + return Ok(warp::reply::with_status("Unauthorized", warp::http::StatusCode::UNAUTHORIZED)); + } + + self.register_custom_gauge(req.name, req.description); + Ok(warp::reply::with_status("Metric registered", warp::http::StatusCode::OK)) + } + + async fn handle_update_custom_metric(&self, req: UpdateMetricRequest, auth: String) -> Result { + if !self.validate_token(auth) { + return Ok(warp::reply::with_status("Unauthorized", warp::http::StatusCode::UNAUTHORIZED)); + } + + let gauge = self.register_custom_gauge(req.name, "".to_string()); // Assuming the gauge is already registered + let labels = req + .labels + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) + .collect::>(); + self.update_custom_gauge(&gauge, req.value, &labels); + Ok(warp::reply::with_status("Metric updated", warp::http::StatusCode::OK)) + } + + + fn validate_token(&self, token: String) -> bool { + // TODO: Parsing should probably not happen here (too late) + token.trim().replace("Bearer ", "") == self.jwt_token.trim() + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs new file mode 100644 index 00000000..e704cbe6 --- /dev/null +++ b/crates/metrics/src/lib.rs @@ -0,0 +1,2 @@ +pub mod sdk; +pub mod docker_metrics_collector; \ No newline at end of file diff --git a/crates/metrics/src/sdk.rs b/crates/metrics/src/sdk.rs new file mode 100644 index 00000000..d0c684a7 --- /dev/null +++ b/crates/metrics/src/sdk.rs @@ -0,0 +1,59 @@ +use cb_common::config::JWT_ENV; +use cb_common::config::METRICS_SERVER_URL; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::env; + +#[derive(Deserialize, Serialize)] +struct RegisterMetricRequest { + name: String, + description: String, +} + +#[derive(Deserialize, Serialize)] +struct UpdateMetricRequest { + name: String, + value: f64, + labels: Vec<(String, String)>, +} + +pub async fn register_custom_metric(name: &str, description: &str) -> Result<(), reqwest::Error> { + let server_url = env::var(METRICS_SERVER_URL).expect(&format!("{METRICS_SERVER_URL} is not set")); + let jwt_token = env::var(JWT_ENV).expect(&format!("{JWT_ENV} must be set")); + + let client = Client::new(); + let req = RegisterMetricRequest { + name: name.to_string(), + description: description.to_string(), + }; + + client.post(format!("http://{}/register_custom_metric", server_url)) + .header("Authorization", format!("Bearer {}", jwt_token)) + .json(&req) + .send() + .await? + .error_for_status()?; + + Ok(()) +} + +pub async fn update_custom_metric(name: &str, value: f64, labels: Vec<(String, String)>) -> Result<(), reqwest::Error> { + let server_url = env::var(METRICS_SERVER_URL).expect(&format!("{METRICS_SERVER_URL} is not set")); + let jwt_token = env::var(JWT_ENV).expect(&format!("{JWT_ENV} must be set")); + + let client = Client::new(); + let req = UpdateMetricRequest { + name: name.to_string(), + value, + labels, + }; + + client.post(format!("http://{}/update_custom_metric", server_url)) + .header("Authorization", format!("Bearer {}", jwt_token)) + .json(&req) + .send() + .await? + .error_for_status()?; + + Ok(()) +} diff --git a/da-commit-dockerized b/da-commit-dockerized new file mode 160000 index 00000000..156b3f3f --- /dev/null +++ b/da-commit-dockerized @@ -0,0 +1 @@ +Subproject commit 156b3f3f0c63d4301add47d6461b5c6ea31d8ec2 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..77518542 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,28 @@ +version: '3' + +services: + prometheus: + build: + context: . + dockerfile: Dockerfile.prometheus + platform: linux/amd64 + ports: + - "9090:9090" # Only in case we'd want to inspect this in the browser, otherwise not needed + networks: + - monitoring + + grafana: + image: grafana/grafana:latest + platform: linux/amd64 + ports: + - "3000:3000" + networks: + - monitoring + depends_on: + - prometheus + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + +networks: + monitoring: + driver: bridge diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 425185b1..8397e0fb 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -9,6 +9,7 @@ cb-cli.workspace = true cb-common.workspace = true cb-pbs.workspace = true cb-crypto.workspace = true +cb-metrics.workspace = true alloy-rpc-types-beacon.workspace = true diff --git a/examples/custom_boost.rs b/examples/custom_boost.rs index 6d716a9a..5bbc7aa7 100644 --- a/examples/custom_boost.rs +++ b/examples/custom_boost.rs @@ -11,7 +11,7 @@ use axum::{ routing::get, Router, }; -use cb_common::{config::load_pbs_config, utils::initialize_tracing_log}; +use cb_common::{config::{load_module_config, load_pbs_config, CommitBoostConfig, JWT_ENV}, utils::initialize_tracing_log}; use cb_pbs::{BuilderApi, BuilderApiState, BuilderState, PbsService}; use tracing::info; @@ -59,11 +59,13 @@ async fn main() { info!("Starting custom pbs module"); - // TODO: pass these via config - let jwt = "my_jwt_token"; - let address = "0.0.0.0:18550".parse().unwrap(); + let jwt = std::env::var(JWT_ENV).expect(&format!("{JWT_ENV} not set")); + let address = CommitBoostConfig::from_env_path() + .signer + .expect("Signer config missing") + .address; - let state = BuilderState::new(chain, config, address, jwt); + let state = BuilderState::new(chain, config, address, &jwt); PbsService::run::(state).await; } diff --git a/examples/da_commit.rs b/examples/da_commit.rs index a815356b..a877d23e 100644 --- a/examples/da_commit.rs +++ b/examples/da_commit.rs @@ -3,10 +3,11 @@ use std::time::Duration; use alloy_rpc_types_beacon::{BlsPublicKey, BlsSignature}; use cb_common::{ commit::{client::SignerClient, request::SignRequest}, - config::{load_module_config, ModuleConfig}, + config::{load_module_config, ModuleConfig, JWT_ENV}, utils::initialize_tracing_log, }; use eyre::OptionExt; +use cb_metrics::sdk::{register_custom_metric, update_custom_metric}; use serde::Deserialize; use tokio::time::sleep; use tracing::{error, info}; @@ -39,11 +40,17 @@ impl DaCommitService { loop { self.send_request(data, *pubkey).await?; + + update_custom_metric("custom_metric", 42.0, vec![("label_key".to_string(), "label_value".to_string())]) + .await + .expect("Failed to update custom metric"); + sleep(Duration::from_secs(self.config.extra.sleep_secs)).await; data += 1; } } + pub async fn send_request(&self, data: u64, pubkey: BlsPublicKey) -> eyre::Result<()> { let datagram = Datagram { data }; let request = SignRequest::builder(&self.config.id, pubkey).with_msg(&datagram); @@ -62,10 +69,12 @@ async fn main() { let config = load_module_config::(); + register_custom_metric("custom_metric", "A custom metric for demonstration").await.expect("Failed to register custom metric."); + info!(module_id = config.config.id, "Starting module"); // TODO: pass this via the module config - let jwt = "my_jwt_token"; + let jwt = &std::env::var(JWT_ENV).expect(&format!("{JWT_ENV} not set")); let client = SignerClient::new(config.sign_address, jwt).expect("failed to create client"); let service = DaCommitService { config: config.config, signer_client: client }; diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 00000000..0a13f047 --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'rust_metrics' + static_configs: + # Target the htpp server provided by the native CB app (not yet dockerized, that's why we need `host.docker.internal`) + - targets: ['host.docker.internal:13030']