Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact-block: try to ensure response consistency #4586

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions crates/core/component/compact-block/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use anyhow::bail;
use cnidarium::Storage;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use penumbra_proto::core::component::compact_block::v1::{
query_service_server::QueryService, CompactBlockRangeRequest, CompactBlockRangeResponse,
CompactBlockRequest, CompactBlockResponse,
query_service_server::QueryService, CompactBlock, CompactBlockRangeRequest,
CompactBlockRangeResponse, CompactBlockRequest, CompactBlockResponse,
};
use penumbra_sct::component::clock::EpochRead;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -103,6 +103,11 @@ impl QueryService for Server {

let (tx_blocks, rx_blocks) = mpsc::channel(10);
let tx_blocks_err = tx_blocks.clone();
// Wrap the block sender in a guard that ensures we only send the expected next block
let mut tx_blocks = BlockSender {
next_height: start_height,
inner: tx_blocks,
};
tokio::spawn(
async move {
let _guard = CompactBlockConnectionCounter::new();
Expand Down Expand Up @@ -142,7 +147,7 @@ impl QueryService for Server {
// Future iterations of this work should start by moving block serialization
// outside of the `send_op` future, and investigate if long blocking sends can
// happen for benign reasons (i.e not caused by the client).
tx_blocks.send(Ok(compact_block)).await?;
tx_blocks.send(compact_block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}

Expand Down Expand Up @@ -171,10 +176,7 @@ impl QueryService for Server {
.await
.expect("no error fetching block")
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("client closed connection"))?;
tx_blocks.send(block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}

Expand All @@ -200,10 +202,7 @@ impl QueryService for Server {
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.expect("compact block for in-range height must be present");
tx_blocks
.send(Ok(block))
.await
.map_err(|_| tonic::Status::cancelled("channel closed"))?;
tx_blocks.send(block).await?;
metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1);
}
}
Expand Down Expand Up @@ -250,3 +249,24 @@ impl Drop for CompactBlockConnectionCounter {
metrics::gauge!(metrics::COMPACT_BLOCK_RANGE_ACTIVE_CONNECTIONS).decrement(1.0);
}
}

/// Stateful wrapper for a mpsc that tracks the outbound height
struct BlockSender {
next_height: u64,
inner: mpsc::Sender<Result<CompactBlock, tonic::Status>>,
}

impl BlockSender {
async fn send(&mut self, block: CompactBlock) -> anyhow::Result<()> {
if block.height != self.next_height {
bail!(
"block height mismatch while sending: expected {}, got {}",
self.next_height,
block.height
);
}
self.inner.send(Ok(block)).await?;
self.next_height += 1;
Ok(())
}
}
Loading