diff --git a/Cargo.lock b/Cargo.lock index cccc4116744..659a371fba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4389,6 +4389,7 @@ version = "0.28.2" dependencies = [ "diesel", "graph", + "graph-chain-ethereum", "graph-graphql", "graph-mock", "graph-node", @@ -4396,6 +4397,7 @@ dependencies = [ "graphql-parser", "hex-literal", "lazy_static", + "prost-types", "serde", ] diff --git a/chain/ethereum/src/codec.rs b/chain/ethereum/src/codec.rs index f80b9bceb42..211a70a00c8 100644 --- a/chain/ethereum/src/codec.rs +++ b/chain/ethereum/src/codec.rs @@ -4,7 +4,7 @@ mod pbcodec; use anyhow::format_err; use graph::{ - blockchain::{Block as BlockchainBlock, BlockPtr}, + blockchain::{Block as BlockchainBlock, BlockPtr, ChainStoreBlock, ChainStoreData}, prelude::{ web3, web3::types::{Bytes, H160, H2048, H256, H64, U256, U64}, @@ -441,6 +441,14 @@ impl BlockchainBlock for Block { fn parent_ptr(&self) -> Option { self.parent_ptr() } + + // This implementation provides the timestamp so that it works with block _meta's timestamp. + // However, the firehose types will not populate the transaction receipts so switching back + // from firehose ingestor to the firehose ingestor will prevent non final block from being + // processed using the block stored by firehose. + fn data(&self) -> Result { + self.header().to_json() + } } impl HeaderOnlyBlock { @@ -449,6 +457,25 @@ impl HeaderOnlyBlock { } } +impl Into for &BlockHeader { + fn into(self) -> ChainStoreData { + ChainStoreData { + block: ChainStoreBlock::new( + self.timestamp.as_ref().unwrap().seconds, + jsonrpc_core::Value::Null, + ), + } + } +} + +impl BlockHeader { + fn to_json(&self) -> Result { + let chain_store_data: ChainStoreData = self.into(); + + jsonrpc_core::to_value(chain_store_data) + } +} + impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr { fn from(b: &'a HeaderOnlyBlock) -> BlockPtr { BlockPtr::from(b.header()) @@ -467,4 +494,43 @@ impl BlockchainBlock for HeaderOnlyBlock { fn parent_ptr(&self) -> Option { self.header().parent_ptr() } + + // This implementation provides the timestamp so that it works with block _meta's timestamp. + // However, the firehose types will not populate the transaction receipts so switching back + // from firehose ingestor to the firehose ingestor will prevent non final block from being + // processed using the block stored by firehose. + fn data(&self) -> Result { + self.header().to_json() + } +} + +#[cfg(test)] +mod test { + use graph::{blockchain::Block as _, prelude::chrono::Utc}; + use prost_types::Timestamp; + + use crate::codec::BlockHeader; + + use super::Block; + + #[test] + fn ensure_block_serialization() { + let now = Utc::now().timestamp(); + let mut block = Block::default(); + let mut header = BlockHeader::default(); + header.timestamp = Some(Timestamp { + seconds: now, + nanos: 0, + }); + + block.header = Some(header); + + let str_block = block.data().unwrap().to_string(); + + assert_eq!( + str_block, + // if you're confused when reading this, format needs {{ to escape { + format!(r#"{{"block":{{"data":null,"timestamp":"{}"}}}}"#, now) + ); + } } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 25b9026cc78..d432c1ba726 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -71,11 +71,55 @@ pub trait Block: Send + Sync { } /// The data that should be stored for this block in the `ChainStore` + /// TODO: Return ChainStoreData once it is available for all chains fn data(&self) -> Result { Ok(serde_json::Value::Null) } } +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +/// This is the root data for the chain store. This stucture provides backwards +/// compatibility with existing data for ethereum. +pub struct ChainStoreData { + pub block: ChainStoreBlock, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +/// ChainStoreBlock is intended to standardize the information stored in the data +/// field of the ChainStore. All the chains should eventually return this type +/// on the data() implementation for block. This will ensure that any part of the +/// structured data can be relied upon for all chains. +pub struct ChainStoreBlock { + /// Unix timestamp (seconds since epoch), can be stored as hex or decimal. + timestamp: String, + data: serde_json::Value, +} + +impl ChainStoreBlock { + pub fn new(unix_timestamp: i64, data: serde_json::Value) -> Self { + Self { + timestamp: unix_timestamp.to_string(), + data, + } + } + + pub fn timestamp_str(&self) -> &str { + &self.timestamp + } + + pub fn timestamp(&self) -> i64 { + let (rdx, i) = if self.timestamp.starts_with("0x") { + (16, 2) + } else { + (10, 0) + }; + + i64::from_str_radix(&self.timestamp[i..], rdx).unwrap_or(0) + } +} + #[async_trait] // This is only `Debug` because some tests require that pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { diff --git a/node/src/main.rs b/node/src/main.rs index acc4d5a8315..b0755429c84 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser as _; use ethereum::chain::{EthereumAdapterSelector, EthereumBlockRefetcher, EthereumStreamBuilder}; +use ethereum::codec::HeaderOnlyBlock; use ethereum::{ BlockIngestor as EthereumBlockIngestor, EthereumAdapterTrait, EthereumNetworks, RuntimeAdapter, }; @@ -373,13 +374,47 @@ async fn main() { if !opt.disable_block_ingestor { if ethereum_chains.len() > 0 { let block_polling_interval = Duration::from_millis(opt.ethereum_polling_interval); + // Each chain contains both the rpc and firehose endpoints so provided + // IS_FIREHOSE_PREFERRED is set to true, a chain will use firehose if it has + // endpoints set but chains are essentially guaranteed to use EITHER firehose or RPC + // but will never start both. + let (firehose_eth_chains, polling_eth_chains): (HashMap<_, _>, HashMap<_, _>) = + ethereum_chains + .into_iter() + .partition(|(_, chain)| chain.is_firehose_supported()); start_block_ingestor( &logger, &logger_factory, block_polling_interval, - ethereum_chains, + polling_eth_chains, ); + + firehose_networks_by_kind + .get(&BlockchainKind::Ethereum) + .map(|eth_firehose_endpoints| { + start_firehose_block_ingestor::<_, HeaderOnlyBlock>( + &logger, + &network_store, + firehose_eth_chains + .into_iter() + .map(|(name, chain)| { + let firehose_endpoints = eth_firehose_endpoints + .networks + .get(&name) + .expect(&format!("chain {} to have endpoints", name)) + .clone(); + ( + name, + FirehoseChain { + chain, + firehose_endpoints, + }, + ) + }) + .collect(), + ) + }); } start_firehose_block_ingestor::<_, ArweaveBlock>( diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs index b8b2c9480d1..c82b347acfb 100644 --- a/store/postgres/tests/store.rs +++ b/store/postgres/tests/store.rs @@ -2028,6 +2028,39 @@ fn parse_timestamp() { }) } +#[test] +fn parse_timestamp_firehose() { + const EXPECTED_TS: u64 = 1657712166; + + run_test(|store, _, _| async move { + use block_store::*; + // The test subgraph is at block 2. Since we don't ever delete + // the genesis block, the only block eligible for cleanup is BLOCK_ONE + // and the first retained block is block 2. + block_store::set_chain( + vec![ + &*GENESIS_BLOCK, + &*BLOCK_ONE, + &*BLOCK_TWO, + &*BLOCK_THREE_TIMESTAMP_FIREHOSE, + ], + NETWORK_NAME, + ); + let chain_store = store + .block_store() + .chain_store(NETWORK_NAME) + .expect("fake chain store"); + + let (_network, number, timestamp) = chain_store + .block_number(&BLOCK_THREE_TIMESTAMP_FIREHOSE.block_hash()) + .await + .expect("block_number to return correct number and timestamp") + .unwrap(); + assert_eq!(number, 3); + assert_eq!(timestamp.unwrap(), EXPECTED_TS); + }) +} + #[test] /// checks if retrieving the timestamp from the data blob works. /// on ethereum, the block has timestamp as U256 so it will always have a value diff --git a/store/test-store/Cargo.toml b/store/test-store/Cargo.toml index 1ddc550eddc..c17cf692e75 100644 --- a/store/test-store/Cargo.toml +++ b/store/test-store/Cargo.toml @@ -12,7 +12,9 @@ graph-mock = { path = "../../mock" } graph-node = { path = "../../node" } graph = { path = "../../graph" } graph-store-postgres = { path = "../postgres" } +graph-chain-ethereum= { path = "../../chain/ethereum" } lazy_static = "1.1" hex-literal = "0.3" diesel = { version = "1.4.8", features = ["postgres", "serde_json", "numeric", "r2d2"] } serde = "1.0" +prost-types = "0.10.1" diff --git a/store/test-store/src/block_store.rs b/store/test-store/src/block_store.rs index dafb2aebed5..fc3e40d08c8 100644 --- a/store/test-store/src/block_store.rs +++ b/store/test-store/src/block_store.rs @@ -4,12 +4,14 @@ use lazy_static::lazy_static; use graph::components::store::BlockStore; use graph::{ - blockchain::Block, + blockchain::Block as BlockchainBlock, prelude::{ serde_json, web3::types::H256, web3::types::U256, BlockHash, BlockNumber, BlockPtr, EthereumBlock, LightEthereumBlock, }, }; +use graph_chain_ethereum::codec::{Block, BlockHeader}; +use prost_types::Timestamp; lazy_static! { // Genesis block @@ -33,6 +35,7 @@ lazy_static! { pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None); pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313"); pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166))); + pub static ref BLOCK_THREE_TIMESTAMP_FIREHOSE: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986f", Some(U256::from(1657712166))); // This block is special and serializes in a slightly different way, this is needed to simulate non-ethereum behaviour at the store level. If you're not sure // what you are doing, don't use this block for other tests. pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None); @@ -96,9 +99,25 @@ impl FakeBlock { transaction_receipts: Vec::new(), } } + + pub fn as_firehose_block(&self) -> Block { + let mut block = Block::default(); + block.hash = self.hash.clone().into_bytes(); + block.number = self.number as u64; + + let mut header = BlockHeader::default(); + header.parent_hash = self.parent_hash.clone().into_bytes(); + header.timestamp = self.timestamp.map(|ts| Timestamp { + seconds: i64::from_str_radix(&ts.to_string(), 10).unwrap(), + nanos: 0, + }); + block.header = Some(header); + + block + } } -impl Block for FakeBlock { +impl BlockchainBlock for FakeBlock { fn ptr(&self) -> BlockPtr { self.block_ptr() } @@ -115,7 +134,12 @@ impl Block for FakeBlock { } fn data(&self) -> Result { - let mut value: serde_json::Value = serde_json::to_value(self.as_ethereum_block())?; + let mut value: serde_json::Value = if self.eq(&BLOCK_THREE_TIMESTAMP_FIREHOSE) { + self.as_firehose_block().data().unwrap() + } else { + serde_json::to_value(self.as_ethereum_block())? + }; + if !self.eq(&BLOCK_THREE_NO_TIMESTAMP) { return Ok(value); }; @@ -145,6 +169,9 @@ pub fn set_chain(chain: FakeBlockList, network: &str) { .block_store() .chain_store(network) .unwrap(); - let chain: Vec<&dyn Block> = chain.iter().map(|block| *block as &dyn Block).collect(); + let chain: Vec<&dyn BlockchainBlock> = chain + .iter() + .map(|block| *block as &dyn BlockchainBlock) + .collect(); store.set_chain(&GENESIS_BLOCK.hash, chain); }