Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: Add endpoint metrics #4430

Merged
merged 7 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 48 additions & 44 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Error;
use graph::{
endpoint::EndpointMetrics,
env::env_var,
firehose::SubgraphLimit,
log::logger,
prelude::{prost, tokio, tonic},
{firehose, firehose::FirehoseEndpoint},
};
Expand All @@ -20,13 +22,18 @@ async fn main() -> Result<(), Error> {
token = Some(token_env);
}

let logger = logger(false);
let host = "https://api.streamingfast.io:443".to_string();
let metrics = Arc::new(EndpointMetrics::new(logger, &[host.clone()]));

let firehose = Arc::new(FirehoseEndpoint::new(
"firehose",
"https://api.streamingfast.io:443",
&host,
token,
false,
false,
SubgraphLimit::Unlimited,
metrics,
));

loop {
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl Blockchain for Chain {
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.random()?
.endpoint()?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
.await
.map_err(IngestorError::Unknown),
Expand Down
4 changes: 4 additions & 0 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{format_err, Context, Error};
use graph::blockchain::block_stream::BlockStreamEvent;
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
use graph::endpoint::EndpointMetrics;
use graph::firehose::SubgraphLimit;
use graph::prelude::{info, tokio, DeploymentHash, Registry};
use graph::tokio_stream::StreamExt;
Expand Down Expand Up @@ -41,13 +42,16 @@ async fn main() -> Result<(), Error> {
prometheus_registry.clone(),
));

let endpoint_metrics = EndpointMetrics::new(logger.clone(), &[endpoint.clone()]);

let firehose = Arc::new(FirehoseEndpoint::new(
"substreams",
&endpoint,
token,
false,
false,
SubgraphLimit::Unlimited,
Arc::new(endpoint_metrics),
));

let mut stream: SubstreamsBlockStream<graph_chain_substreams::Chain> =
Expand Down
2 changes: 1 addition & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chrono = "0.4.23"
envconfig = "0.10.0"
Inflector = "0.11.3"
isatty = "0.1.9"
reqwest = { version = "0.11.2", features = ["json", "stream", "multipart"] }
reqwest = { version = "0.11.14", features = ["json", "stream", "multipart"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the rest of the change, was just looking into middleware options and noticed this was "outdated"

ethabi = "17.2"
hex = "0.4.3"
http = "0.2.3"
Expand Down
4 changes: 2 additions & 2 deletions graph/src/blockchain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<C: Blockchain> ChainClient<C> {
// adapter limits in the configuration can effectively disable firehose
// by setting a limit to 0.
// In this case we should fallback to an rpc client.
let firehose_available = firehose_endpoints.random().is_ok();
let firehose_available = firehose_endpoints.endpoint().is_ok();

match firehose_available {
true => Self::Firehose(firehose_endpoints),
Expand All @@ -42,7 +42,7 @@ impl<C: Blockchain> ChainClient<C> {

pub fn firehose_endpoint(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
match self {
ChainClient::Firehose(endpoints) => endpoints.random(),
ChainClient::Firehose(endpoints) => endpoints.endpoint(),
_ => Err(anyhow!("firehose endpoint requested on rpc chain client")),
}
}
Expand Down
1 change: 1 addition & 0 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
"start_block" => start_block_num,
"subgraph" => &deployment,
"cursor" => latest_cursor.to_string(),
"provider_err_count" => endpoint.current_error_count(),
);

// We just reconnected, assume that we want to back off on errors
Expand Down
Loading