Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle null blocks from Lotus #5294

Merged
merged 16 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --release
args: --release
3 changes: 2 additions & 1 deletion chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down Expand Up @@ -241,6 +241,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand Down
3 changes: 2 additions & 1 deletion chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
Expand All @@ -194,7 +195,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
18 changes: 11 additions & 7 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,13 +1140,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send + '_>,
>;

/// Load block pointer for the specified `block number`.
fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = bc::IngestorError> + Send>;

/// Find a block by its number, according to the Ethereum node.
///
/// Careful: don't use this function without considering race conditions.
Expand All @@ -1162,6 +1155,17 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_number: BlockNumber,
) -> Box<dyn Future<Item = Option<H256>, Error = Error> + Send>;

/// Finds the hash and number of the lowest non-null block with height greater than or equal to
/// the given number.
///
/// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must
/// also be considered for the resolved block, in case it is higher than the requested number.
async fn next_existing_ptr_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error>;
incrypto32 marked this conversation as resolved.
Show resolved Hide resolved

/// Call the function of a smart contract. A return of `None` indicates
/// that the call reverted. The returned `CallSource` indicates where
/// the result came from for accounting purposes
Expand Down
11 changes: 6 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,9 @@ impl Blockchain for Chain {
.clone();

adapter
.block_pointer_from_number(logger, number)
.compat()
.next_existing_ptr_to_number(logger, number)
.await
.map_err(From::from)
}
}
}
Expand Down Expand Up @@ -673,7 +673,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
from: BlockNumber,
to: BlockNumber,
filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.logger.clone(),
Expand Down Expand Up @@ -707,7 +707,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let block_number = block.number() as BlockNumber;
let blocks = blocks_with_triggers(
let (blocks, _) = blocks_with_triggers(
adapter,
logger.clone(),
self.chain_store.clone(),
Expand Down Expand Up @@ -747,11 +747,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
incrypto32 marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Option<BlockFinality>, Error> {
let block: Option<EthereumBlock> = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset)
.ancestor_block(ptr, offset, root)
.await?
.map(json::from_value)
.transpose()?;
Expand Down
141 changes: 89 additions & 52 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
let web3 = web3.clone();
retry(format!("load block ptr {}", block_num), &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand All @@ -810,8 +811,16 @@ impl EthereumAdapter {
.boxed()
.compat()
.from_err()
.then(|res| {
if detect_null_block(&res) {
Ok(None)
} else {
Some(res).transpose()
}
})
}))
.buffered(ENV_VARS.block_batch_size)
.filter_map(|b| b)
.map(|b| b.into())
}

Expand All @@ -830,13 +839,12 @@ impl EthereumAdapter {
logger: &Logger,
block_ptr: BlockPtr,
) -> Result<bool, Error> {
let block_hash = self
.block_hash_by_block_number(logger, block_ptr.number)
.compat()
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
// small optimization.
let canonical_block = self
.next_existing_ptr_to_number(logger, block_ptr.number)
.await?;
block_hash
.ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number))
.map(|block_hash| block_hash == block_ptr.hash_as_h256())
Ok(canonical_block == block_ptr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check will always be true for chains that are not Lotus, right? And this check requires that we make an RPC call, i.e., introduces overhead for chains that don't need it. I would much prefer an approach where Lotus gets its own EthereumAdapter, something like

struct LotusAdapter {
  inner: chain::ethereum::EthereumAdapter
}

where the impl of trait EthereumAdapter just forwards calls to inner when that's appropriate and has its own Lotus-specific impl like for this method when needed. That way, existing chains don't pay a price for the additional checks that Lotus requires.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check will always be true for chains that are not Lotus, right?

I dont understand this, this check can be false on other chains as well i believe, and even before the PR it still needed an RPC call the difference now is that for Lotus it might need more than one call, but for other chains it will still need the one call which was present even before this PR.

Am i missing something?

}

pub(crate) fn logs_in_block_range(
Expand Down Expand Up @@ -1079,6 +1087,16 @@ impl EthereumAdapter {
}
}

// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
// error returned when requesting such a null round. Ideally there should be a defined reponse or
// message for this case, or a check that is less dependent on the Filecoin implementation.
fn detect_null_block<T>(res: &Result<T, Error>) -> bool {
match res {
Ok(_) => false,
Err(e) => e.to_string().contains("requested epoch was a null round"),
}
}

#[async_trait]
impl EthereumAdapterTrait for EthereumAdapter {
fn provider(&self) -> &str {
Expand Down Expand Up @@ -1363,26 +1381,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::pin(block_future)
}

fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = IngestorError> + Send> {
Box::new(
self.block_hash_by_block_number(logger, block_number)
.and_then(move |block_hash_opt| {
block_hash_opt.ok_or_else(|| {
anyhow!(
"Ethereum node could not find start block hash by block number {}",
&block_number
)
})
})
.from_err()
.map(move |block_hash| BlockPtr::from((block_hash, block_number))),
)
}

fn block_hash_by_block_number(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1448,6 +1446,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::new(self.code(logger, address, block_ptr))
}

async fn next_existing_ptr_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error> {
let mut next_number = block_number;
loop {
let retry_log_message = format!(
"eth_getBlockByNumber RPC call for block number {}",
next_number
);
let web3 = self.web3.clone();
let logger = logger.clone();
let res = retry(retry_log_message, &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let web3 = web3.cheap_clone();
async move {
web3.eth()
.block(BlockId::Number(next_number.into()))
.await
.map(|block_opt| block_opt.and_then(|block| block.hash))
.map_err(Error::from)
}
})
.await
.map_err(move |e| {
e.into_inner().unwrap_or_else(move || {
anyhow!(
"Ethereum node took too long to return data for block #{}",
next_number
)
})
});
if detect_null_block(&res) {
next_number += 1;
continue;
}
return match res {
Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)),
Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)),
Err(e) => Err(e),
};
}
}

async fn contract_call(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1652,9 +1698,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
}
}

/// Returns blocks with triggers, corresponding to the specified range and filters.
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
incrypto32 marked this conversation as resolved.
Show resolved Hide resolved
/// If a block contains no triggers, there may be no corresponding item in the stream.
/// However the `to` block will always be present, even if triggers are empty.
/// However the (resolved) `to` block will always be present, even if triggers are empty.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
Expand All @@ -1674,7 +1721,7 @@ pub(crate) async fn blocks_with_triggers(
to: BlockNumber,
filter: &TriggerFilter,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Vec<BlockWithTriggers<crate::Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
// Each trigger filter needs to be queried for the same block range
// and the blocks yielded need to be deduped. If any error occurs
// while searching for a trigger type, the entire operation fails.
Expand All @@ -1685,6 +1732,13 @@ pub(crate) async fn blocks_with_triggers(
let trigger_futs: FuturesUnordered<BoxFuture<Result<Vec<EthereumTrigger>, anyhow::Error>>> =
FuturesUnordered::new();

// Resolve the nearest non-null "to" block
debug!(logger, "Finding nearest valid `to` block to {}", to);

let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
let to_hash = to_ptr.hash_as_h256();
let to = to_ptr.block_number();

// This is for `start` triggers which can be initialization handlers which needs to be run
// before all other triggers
if filter.block.trigger_every_block {
Expand Down Expand Up @@ -1753,28 +1807,11 @@ pub(crate) async fn blocks_with_triggers(
trigger_futs.push(block_future)
}

// Get hash for "to" block
let to_hash_fut = eth
.block_hash_by_block_number(&logger, to)
.and_then(|hash| match hash {
Some(hash) => Ok(hash),
None => {
warn!(logger,
"Ethereum endpoint is behind";
"url" => eth.provider()
);
bail!("Block {} not found in the chain", to)
}
})
.compat();

// Join on triggers and block hash resolution
let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut);

// Unpack and handle possible errors in the previously joined futures
let triggers =
triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?;
// Join on triggers, unpack and handle possible errors
let triggers = trigger_futs
.try_concat()
.await
.with_context(|| format!("Failed to obtain triggers for block {}", to))?;

let mut block_hashes: HashSet<H256> =
triggers.iter().map(EthereumTrigger::block_hash).collect();
Expand Down Expand Up @@ -1839,7 +1876,7 @@ pub(crate) async fn blocks_with_triggers(
));
}

Ok(blocks)
Ok((blocks, to))
}

pub(crate) async fn get_calls(
Expand Down
3 changes: 2 additions & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down Expand Up @@ -390,6 +390,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand Down
3 changes: 2 additions & 1 deletion chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand All @@ -373,7 +374,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &crate::adapter::TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
Loading
Loading