Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve(metrics): Turn "Flushing logs to Elasticsearch" log into metric #4333

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions graph/src/log/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::Duration;
use chrono::prelude::{SecondsFormat, Utc};
use futures03::TryFutureExt;
use http::header::CONTENT_TYPE;
use prometheus::Counter;
use reqwest;
use reqwest::Client;
use serde::ser::Serializer as SerdeSerializer;
Expand Down Expand Up @@ -175,15 +176,21 @@ pub struct ElasticDrainConfig {
pub struct ElasticDrain {
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
logs: Arc<Mutex<Vec<ElasticLog>>>,
}

impl ElasticDrain {
/// Creates a new `ElasticDrain`.
pub fn new(config: ElasticDrainConfig, error_logger: Logger) -> Self {
pub fn new(
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
) -> Self {
let drain = ElasticDrain {
config,
error_logger,
logs_sent_counter,
logs: Arc::new(Mutex::new(vec![])),
};
drain.periodically_flush_logs();
Expand All @@ -192,6 +199,7 @@ impl ElasticDrain {

fn periodically_flush_logs(&self) {
let flush_logger = self.error_logger.clone();
let logs_sent_counter = self.logs_sent_counter.clone();
let logs = self.logs.clone();
let config = self.config.clone();
let mut interval = tokio::time::interval(self.config.flush_interval);
Expand All @@ -203,7 +211,6 @@ impl ElasticDrain {

let logs = logs.clone();
let config = config.clone();
let flush_logger = flush_logger.clone();
let logs_to_send = {
let mut logs = logs.lock().unwrap();
let logs_to_send = (*logs).clone();
Expand All @@ -217,11 +224,7 @@ impl ElasticDrain {
continue;
}

debug!(
flush_logger,
"Flushing {} logs to Elasticsearch",
logs_to_send.len()
);
logs_sent_counter.inc_by(logs_to_send.len() as f64);

// The Elasticsearch batch API takes requests with the following format:
// ```ignore
Expand Down Expand Up @@ -382,8 +385,12 @@ impl Drain for ElasticDrain {
///
/// Uses `error_logger` to print any Elasticsearch logging errors,
/// so they don't go unnoticed.
pub fn elastic_logger(config: ElasticDrainConfig, error_logger: Logger) -> Logger {
let elastic_drain = ElasticDrain::new(config, error_logger).fuse();
pub fn elastic_logger(
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
) -> Logger {
let elastic_drain = ElasticDrain::new(config, error_logger, logs_sent_counter).fuse();
let async_drain = slog_async::Async::new(elastic_drain)
.chan_size(20000)
.build()
Expand Down
25 changes: 24 additions & 1 deletion graph/src/log/factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::sync::Arc;

use prometheus::Counter;
use slog::*;

use crate::components::metrics::MetricsRegistry;
use crate::components::store::DeploymentLocator;
use crate::log::elastic::*;
use crate::log::split::*;
Expand All @@ -20,14 +24,20 @@ pub struct ComponentLoggerConfig {
pub struct LoggerFactory {
parent: Logger,
elastic_config: Option<ElasticLoggingConfig>,
metrics_registry: Arc<dyn MetricsRegistry>,
}

impl LoggerFactory {
/// Creates a new factory using a parent logger and optional Elasticsearch configuration.
pub fn new(logger: Logger, elastic_config: Option<ElasticLoggingConfig>) -> Self {
pub fn new(
logger: Logger,
elastic_config: Option<ElasticLoggingConfig>,
metrics_registry: Arc<dyn MetricsRegistry>,
) -> Self {
Self {
parent: logger,
elastic_config,
metrics_registry,
}
}

Expand All @@ -36,6 +46,7 @@ impl LoggerFactory {
Self {
parent,
elastic_config: self.elastic_config.clone(),
metrics_registry: self.metrics_registry.clone(),
}
}

Expand Down Expand Up @@ -68,6 +79,7 @@ impl LoggerFactory {
max_retries: ENV_VARS.elastic_search_max_retries,
},
term_logger.clone(),
self.logs_sent_counter(None),
),
)
})
Expand Down Expand Up @@ -98,9 +110,20 @@ impl LoggerFactory {
max_retries: ENV_VARS.elastic_search_max_retries,
},
term_logger.clone(),
self.logs_sent_counter(Some(loc.hash.as_str())),
),
)
})
.unwrap_or(term_logger)
}

fn logs_sent_counter(&self, deployment: Option<&str>) -> Counter {
self.metrics_registry
.global_deployment_counter(
"graph_elasticsearch_logs_sent",
"Count of logs sent to Elasticsearch endpoint",
deployment.unwrap_or(""),
)
.unwrap()
}
}
17 changes: 9 additions & 8 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ async fn main() {
client: reqwest::Client::new(),
});

// Set up Prometheus registry
let prometheus_registry = Arc::new(Registry::new());
let metrics_registry = Arc::new(MetricsRegistry::new(
logger.clone(),
prometheus_registry.clone(),
));

// Create a component and subgraph logger factory
let logger_factory = LoggerFactory::new(logger.clone(), elastic_config);
let logger_factory =
LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone());

// Try to create IPFS clients for each URL specified in `--ipfs`
let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &opt.ipfs);
Expand All @@ -207,13 +215,6 @@ async fn main() {
// Convert the clients into a link resolver. Since we want to get past
// possible temporary DNS failures, make the resolver retry
let link_resolver = Arc::new(LinkResolver::new(ipfs_clients, env_vars.cheap_clone()));

// Set up Prometheus registry
let prometheus_registry = Arc::new(Registry::new());
let metrics_registry = Arc::new(MetricsRegistry::new(
logger.clone(),
prometheus_registry.clone(),
));
let mut metrics_server =
PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone());

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn run(

let env_vars = Arc::new(EnvVars::from_env().unwrap());
let metrics_registry = metrics_ctx.registry.clone();
let logger_factory = LoggerFactory::new(logger.clone(), None);
let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone());

// FIXME: Hard-coded IPFS config, take it from config file instead?
let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &ipfs_url);
Expand Down
13 changes: 9 additions & 4 deletions server/http/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl GraphQlRunner for TestGraphQlRunner {

#[cfg(test)]
mod test {
use graph_mock::MockMetricsRegistry;

use super::*;

lazy_static! {
Expand All @@ -101,7 +103,7 @@ mod test {
runtime
.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory = LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -142,7 +144,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -222,7 +225,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -267,7 +271,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ pub async fn setup<C: Blockchain>(
});

let logger = graph::log::logger(true);
let logger_factory = LoggerFactory::new(logger.clone(), None);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let logger_factory = LoggerFactory::new(logger.clone(), None, mock_registry.clone());
let node_id = NodeId::new(NODE_ID).unwrap();

// Make sure we're starting from a clean state.
Expand Down
4 changes: 2 additions & 2 deletions tests/src/fixture/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub async fn chain(
x: PhantomData,
}));
let logger = graph::log::logger(true);
let logger_factory = LoggerFactory::new(logger.cheap_clone(), None);
let node_id = NodeId::new(NODE_ID).unwrap();
let mock_registry = Arc::new(MockMetricsRegistry::new());
let logger_factory = LoggerFactory::new(logger.cheap_clone(), None, mock_registry.clone());
let node_id = NodeId::new(NODE_ID).unwrap();

let chain_store = stores.chain_store.cheap_clone();

Expand Down