Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(state): Condense CodeTimer::start() and Span::current() calls in state service #6337

Merged
merged 2 commits into from
Mar 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 10 additions & 72 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,13 +894,13 @@ impl Service<Request> for StateService {
#[instrument(name = "state", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
req.count_metric();
let timer = CodeTimer::start();
let span = Span::current();

match req {
// Uses queued_non_finalized_blocks and pending_utxos in the StateService
// Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
Request::CommitBlock(prepared) => {
let timer = CodeTimer::start();

self.assert_block_can_be_validated(&prepared);

self.pending_utxos
Expand All @@ -916,7 +916,7 @@ impl Service<Request> for StateService {
// there shouldn't be any other code running in the same task,
// so we don't need to worry about blocking it:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
let span = Span::current();

let rsp_rx = tokio::task::block_in_place(move || {
span.in_scope(|| self.queue_and_commit_non_finalized(prepared))
});
Expand Down Expand Up @@ -948,8 +948,6 @@ impl Service<Request> for StateService {
// Uses queued_finalized_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService.
Request::CommitFinalizedBlock(finalized) => {
let timer = CodeTimer::start();

// # Consensus
//
// A non-finalized block verification could have called AwaitUtxo
Expand All @@ -973,7 +971,6 @@ impl Service<Request> for StateService {
// The work is all done, the future just waits on a channel for the result
timer.finish(module_path!(), line!(), "CommitFinalizedBlock");

let span = Span::current();
async move {
rsp_rx
.await
Expand All @@ -995,13 +992,11 @@ impl Service<Request> for StateService {
// Uses pending_utxos and queued_non_finalized_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => {
let timer = CodeTimer::start();

// Prepare the AwaitUtxo future from PendingUxtos.
let response_fut = self.pending_utxos.queue(outpoint);
// Only instrument `response_fut`, the ReadStateService already
// instruments its requests with the same span.
let span = Span::current();

let response_fut = response_fut.instrument(span).boxed();

// Check the non-finalized block queue outside the returned future,
Expand Down Expand Up @@ -1151,15 +1146,14 @@ impl Service<ReadRequest> for ReadStateService {
#[instrument(name = "read_state", skip(self, req))]
fn call(&mut self, req: ReadRequest) -> Self::Future {
req.count_metric();
let timer = CodeTimer::start();
let span = Span::current();

match req {
// Used by the StateService.
ReadRequest::Tip => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let tip = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1180,11 +1174,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the StateService.
ReadRequest::Depth(hash) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let depth = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1205,11 +1196,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the StateService.
ReadRequest::BestChainNextMedianTimePast => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let non_finalized_state = state.latest_non_finalized_state();
Expand All @@ -1234,11 +1222,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the get_block (raw) RPC and the StateService.
ReadRequest::Block(hash_or_height) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1263,11 +1248,8 @@ impl Service<ReadRequest> for ReadStateService {

// For the get_raw_transaction RPC and the StateService.
ReadRequest::Transaction(hash) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let transaction_and_height = state
Expand All @@ -1288,11 +1270,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the getblock (verbose) RPC.
ReadRequest::TransactionIdsForBlock(hash_or_height) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
Expand Down Expand Up @@ -1322,11 +1301,8 @@ impl Service<ReadRequest> for ReadStateService {
}

ReadRequest::UnspentBestChainUtxo(outpoint) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxo = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1351,11 +1327,8 @@ impl Service<ReadRequest> for ReadStateService {

// Manually used by the StateService to implement part of AwaitUtxo.
ReadRequest::AnyChainUtxo(outpoint) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxo = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1376,11 +1349,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the StateService.
ReadRequest::BlockLocator => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_locator = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1403,11 +1373,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the StateService.
ReadRequest::FindBlockHashes { known_blocks, stop } => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_hashes = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1434,11 +1401,8 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the StateService.
ReadRequest::FindBlockHeaders { known_blocks, stop } => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let block_headers = state.non_finalized_state_receiver.with_watch_data(
Expand Down Expand Up @@ -1469,11 +1433,8 @@ impl Service<ReadRequest> for ReadStateService {
}

ReadRequest::SaplingTree(hash_or_height) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1497,11 +1458,8 @@ impl Service<ReadRequest> for ReadStateService {
}

ReadRequest::OrchardTree(hash_or_height) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1526,11 +1484,8 @@ impl Service<ReadRequest> for ReadStateService {

// For the get_address_balance RPC.
ReadRequest::AddressBalance(addresses) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let balance = state.non_finalized_state_receiver.with_watch_data(
Expand Down Expand Up @@ -1558,11 +1513,8 @@ impl Service<ReadRequest> for ReadStateService {
addresses,
height_range,
} => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let tx_ids = state.non_finalized_state_receiver.with_watch_data(
Expand Down Expand Up @@ -1594,11 +1546,8 @@ impl Service<ReadRequest> for ReadStateService {

// For the get_address_utxos RPC.
ReadRequest::UtxosByAddresses(addresses) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let utxos = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1623,11 +1572,8 @@ impl Service<ReadRequest> for ReadStateService {
}

ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
let timer = CodeTimer::start();

let state = self.clone();

let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let latest_non_finalized_best_chain =
Expand Down Expand Up @@ -1664,14 +1610,12 @@ impl Service<ReadRequest> for ReadStateService {

// Used by the get_block and get_block_hash RPCs.
ReadRequest::BestChainBlockHash(height) => {
let timer = CodeTimer::start();

let state = self.clone();

// # Performance
//
// Allow other async tasks to make progress while concurrently reading blocks from disk.
let span = Span::current();

tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let hash = state.non_finalized_state_receiver.with_watch_data(
Expand All @@ -1697,15 +1641,13 @@ impl Service<ReadRequest> for ReadStateService {
// Used by get_block_template RPC.
#[cfg(feature = "getblocktemplate-rpcs")]
ReadRequest::ChainInfo => {
let timer = CodeTimer::start();

let state = self.clone();
let latest_non_finalized_state = self.latest_non_finalized_state();

// # Performance
//
// Allow other async tasks to make progress while concurrently reading blocks from disk.
let span = Span::current();

tokio::task::spawn_blocking(move || {
span.in_scope(move || {
// # Correctness
Expand Down Expand Up @@ -1739,14 +1681,12 @@ impl Service<ReadRequest> for ReadStateService {
// Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
#[cfg(feature = "getblocktemplate-rpcs")]
ReadRequest::SolutionRate { num_blocks, height } => {
let timer = CodeTimer::start();

let state = self.clone();

// # Performance
//
// Allow other async tasks to make progress while concurrently reading blocks from disk.
let span = Span::current();

tokio::task::spawn_blocking(move || {
span.in_scope(move || {
let latest_non_finalized_state = state.latest_non_finalized_state();
Expand Down Expand Up @@ -1794,14 +1734,12 @@ impl Service<ReadRequest> for ReadStateService {

#[cfg(feature = "getblocktemplate-rpcs")]
ReadRequest::CheckBlockProposalValidity(prepared) => {
let timer = CodeTimer::start();

let state = self.clone();

// # Performance
//
// Allow other async tasks to make progress while concurrently reading blocks from disk.
let span = Span::current();

tokio::task::spawn_blocking(move || {
span.in_scope(move || {
tracing::info!("attempting to validate and commit block proposal onto a cloned non-finalized state");
Expand Down