Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Grafana integration #3913

Merged
merged 43 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c284e28
Very WIP
expenses Oct 25, 2019
eda126f
record_metrics macro works
expenses Oct 25, 2019
d2c1753
Integrate into service
expenses Oct 25, 2019
df40107
Licenses and documentation
expenses Oct 25, 2019
eb5ac01
Remove unused Debugs, make respond function clearer
expenses Oct 25, 2019
b62f12f
Conform to line widths, fix service test
expenses Oct 25, 2019
9947f72
Switch to storing the timestamps as millis instead
expenses Oct 25, 2019
562e588
Update core/grafana-data-source/src/lib.rs
expenses Oct 25, 2019
6384f25
Transform timestamps to i64 in serialization
expenses Oct 25, 2019
674e7eb
Merge branch 'grafana-data-source' of https://github.com/expenses/sub…
expenses Oct 25, 2019
0815876
Fix license date
expenses Oct 25, 2019
05b92b7
Binary sort to find selection range for metrics
expenses Oct 25, 2019
003613e
Obey maxDataPoints
expenses Oct 26, 2019
de23d0c
Run a cleaning future
expenses Oct 27, 2019
1d4b25d
Newlines at EOF
expenses Oct 27, 2019
9ba57df
Merge branch 'master' into grafana-data-source
gavofyork Oct 28, 2019
2d46fa8
Update core/service/Cargo.toml
expenses Oct 28, 2019
e700971
Update core/grafana-data-source/src/lib.rs
expenses Oct 28, 2019
db7a711
Fix indentation
expenses Oct 28, 2019
b314fb2
Improve select_points
expenses Oct 28, 2019
7bf30bb
Merge branch 'master' into grafana-data-source
expenses Nov 3, 2019
02a695d
Made test more accurate
expenses Nov 3, 2019
df13752
Inprogress
expenses Nov 7, 2019
5d7b006
Merge branch 'master' into async-await
expenses Nov 8, 2019
483c894
Use the same futures version as hyper for now
expenses Nov 8, 2019
4e96385
Error handling
expenses Nov 8, 2019
a389d96
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 14, 2019
2d9bcd0
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 18, 2019
89c2e0b
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 19, 2019
f782858
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 20, 2019
9605717
Remove dependence on hyper's tokio feature
expenses Nov 20, 2019
b48dae0
Added target_os flag
expenses Nov 20, 2019
a7b5900
Update Cargo.toml
expenses Nov 20, 2019
a3b2298
Simplify example
expenses Nov 20, 2019
0adbb92
Remove compat wildcard
expenses Nov 20, 2019
5e03faf
Merge branch 'master' into grafana-data-source
expenses Nov 21, 2019
5100677
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 22, 2019
53201dd
Merge branch 'grafana-data-source' of https://github.com/expenses/sub…
expenses Nov 22, 2019
6a97c3f
Merge branch 'master' into grafana-data-source
gavofyork Nov 22, 2019
b2f66ce
Merge remote-tracking branch 'parity/master' into grafana-data-source
expenses Nov 22, 2019
7adac1b
Updated lock file
expenses Nov 22, 2019
56429d2
Merge branch 'master' into grafana-data-source
gavofyork Nov 22, 2019
e7adcfb
Fix indentation 😉
expenses Nov 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ members = [
"core/externalities",
"core/finality-grandpa",
"core/finality-grandpa/primitives",
"core/grafana-data-source",
"core/grafana-data-source/test",
"core/inherents",
"core/keyring",
"core/keystore",
Expand Down
4 changes: 4 additions & 0 deletions core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,13 @@ where

let rpc_interface: &str = if cli.rpc_external { "0.0.0.0" } else { "127.0.0.1" };
let ws_interface: &str = if cli.ws_external { "0.0.0.0" } else { "127.0.0.1" };
let grafana_interface: &str = if cli.grafana_external { "0.0.0.0" } else { "127.0.0.1" };

config.rpc_http = Some(parse_address(&format!("{}:{}", rpc_interface, 9933), cli.rpc_port)?);
config.rpc_ws = Some(parse_address(&format!("{}:{}", ws_interface, 9944), cli.ws_port)?);
config.grafana_port = Some(
parse_address(&format!("{}:{}", grafana_interface, 9955), cli.grafana_port)?
);

config.rpc_ws_max_connections = cli.ws_max_connections;
config.rpc_cors = cli.rpc_cors.unwrap_or_else(|| if is_dev {
Expand Down
10 changes: 10 additions & 0 deletions core/cli/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ pub struct RunCmd {
#[structopt(long = "ws-external")]
pub ws_external: bool,

/// Listen to all Grafana data source interfaces.
///
/// Default is local.
#[structopt(long = "grafana-external")]
pub grafana_external: bool,

/// Specify HTTP RPC server TCP port.
#[structopt(long = "rpc-port", value_name = "PORT")]
pub rpc_port: Option<u16>,
Expand All @@ -354,6 +360,10 @@ pub struct RunCmd {
#[structopt(long = "rpc-cors", value_name = "ORIGINS", parse(try_from_str = parse_cors))]
pub rpc_cors: Option<Cors>,

/// Specify Grafana data source server TCP Port.
#[structopt(long = "grafana-port", value_name = "PORT")]
pub grafana_port: Option<u16>,

/// Specify the state pruning mode, a number of blocks to keep or 'archive'.
///
/// Default is to keep all block states if the node is running as a
Expand Down
18 changes: 18 additions & 0 deletions core/grafana-data-source/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
description = "Grafana data source server"
name = "grafana-data-source"
version = "2.0.0"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
hyper = "0.12"
futures = "0.1"
serde_json = "1"
serde = { version = "1", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
lazy_static = "1.4"
parking_lot = "0.9"
tokio-timer = "0.2"
stream-cancel = "0.4"
55 changes: 55 additions & 0 deletions core/grafana-data-source/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! [Grafana] data source server
//!
//! To display node statistics with [Grafana], this module exposes a `run_server` function that
//! starts up a HTTP server that conforms to the [`grafana-json-data-source`] API. The
//! `record_metrics` macro can be used to pass metrics to this server.
//!
//! [Grafana]: https://grafana.com/
//! [`grafana-json-data-source`]: https://github.com/simPod/grafana-json-datasource

use lazy_static::lazy_static;
use std::collections::HashMap;
use parking_lot::RwLock;

mod types;
mod server;
mod util;

pub use server::run_server;
pub use util::now_millis;

type Metrics = HashMap<&'static str, Vec<(f32, i64)>>;

lazy_static! {
/// The `RwLock` wrapping the metrics. Not intended to be used directly.
pub static ref METRICS: RwLock<Metrics> = RwLock::new(Metrics::new());
expenses marked this conversation as resolved.
Show resolved Hide resolved
}

/// Write metrics to `METRICS`.
#[macro_export]
macro_rules! record_metrics(
($($key:expr => $value:expr),*) => {
use $crate::{METRICS, now_millis};
let mut metrics = METRICS.write();
let now = now_millis();
$(
metrics.entry($key).or_insert_with(Vec::new).push(($value as f32, now));
)*
}
);
134 changes: 134 additions & 0 deletions core/grafana-data-source/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use serde::{Serialize, de::DeserializeOwned};
use hyper::{Body, Request, Response, header, service::service_fn, Server};
use futures::{future, Future, stream::Stream};
use chrono::{Duration, Utc};
use stream_cancel::StreamExt;
use crate::{METRICS, util, types::{Target, Query, TimeseriesData}};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
type ResponseFuture = Box<dyn Future<Item=Response<Body>, Error=GenericError> + Send>;

fn api_response(req: Request<Body>) -> ResponseFuture {
match req.uri().path() {
"/" => Box::new(future::ok(Response::new(Body::empty()))),
"/search" => map_request_to_response(req, |target: Target| {
// Filter and return metrics relating to the target
METRICS.read()
.keys()
.filter(|key| key.starts_with(&target.target))
.cloned()
.collect::<Vec<_>>()
}),
"/query" => {
map_request_to_response(req, |query: Query| {
let metrics = METRICS.read();

// Return timeseries data related to the specified metrics
query.targets.iter()
.map(|target| {
let datapoints = metrics.get(target.target.as_str())
.map(|metric| {
let from = util::find_index(&metric, query.range.from);
let to = util::find_index(&metric, query.range.to);

let metric = &metric[from .. to];

if metric.len() > query.max_datapoints {
// Avoid returning more than `max_datapoints` (mostly to stop
// the web browser from having to do a ton of work)
util::select_points(metric, query.max_datapoints)
} else {
metric.to_vec()
}
})
.unwrap_or_else(Vec::new);

TimeseriesData {
target: target.target.clone(), datapoints
}
})
.collect::<Vec<_>>()
})
},
_ => Box::new(future::ok(Response::new(Body::empty()))),
}
}

fn map_request_to_response<Req, Res, T>(req: Request<Body>, transformation: T) -> ResponseFuture
where
Req: DeserializeOwned,
Res: Serialize,
T: Fn(Req) -> Res + Send + Sync + 'static
{
Box::new(req.into_body()
.concat2()
.from_err()
.and_then(move |entire_body| {
let req = serde_json::from_slice(entire_body.as_ref())?;
let res = transformation(req);

let string = serde_json::to_string(&res)?;

Response::builder()
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(string))
.map_err(|e| e.into())
})
)
}

/// Start the data source server.
///
/// The server shuts down cleanly when `shutdown` resolves.
pub fn run_server<F>(address: &std::net::SocketAddr, shutdown: F) -> impl Future<Item=(), Error=()>
expenses marked this conversation as resolved.
Show resolved Hide resolved
where F: Future<Item=(), Error=()> + Clone
{
Server::bind(address)
.serve(|| service_fn(api_response))
.with_graceful_shutdown(shutdown.clone())
.map_err(|_| ())
// Clean up week-old metrics once a day
.join(clean_up(Duration::days(1), Duration::weeks(1), shutdown))
.map(|_| ())
}

// Remove all metrics before a certain duration every so often.
fn clean_up<F>(every: Duration, before: Duration, exit: F) -> impl Future<Item=(), Error=()>
where F: Future<Item=(), Error=()>
{
tokio_timer::Interval::new_interval(every.to_std().unwrap())
.take_until(exit)
.for_each(move |_| {
let oldest_allowed = (Utc::now() - before).timestamp_millis();

let mut metrics = METRICS.write();

for metric in metrics.values_mut() {
// Find the index of the oldest allowed timestamp and cut out all those before it.
let index = util::find_index(&metric, oldest_allowed);

if index > 0 {
*metric = metric[index..].to_vec();
}
}

futures::future::ok(())
})
.map_err(|_| ())
}
Loading