Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove unused private stream type #9357

Merged
merged 1 commit into from
Jul 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 0 additions & 102 deletions crates/net/p2p/src/full_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
error::PeerRequestResult,
headers::client::{HeadersClient, SingleHeaderRequest},
};
use futures::Stream;
use reth_consensus::{Consensus, ConsensusError};
use reth_eth_wire_types::HeadersDirection;
use reth_network_peers::WithPeerId;
Expand Down Expand Up @@ -634,69 +633,6 @@ where
}
}

/// A type that buffers the result of a range request so we can return it as a `Stream`.
struct FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
/// The inner [`FetchFullBlockRangeFuture`] that is polled.
inner: FetchFullBlockRangeFuture<Client>,
/// The blocks that have been received so far.
///
/// If this is `None` then the request is still in progress. If the vec is empty, then all of
/// the response values have been consumed.
blocks: Option<Vec<SealedBlock>>,
}

impl<Client> From<FetchFullBlockRangeFuture<Client>> for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
fn from(inner: FetchFullBlockRangeFuture<Client>) -> Self {
Self { inner, blocks: None }
}
}

impl<Client> Stream for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient + Unpin + 'static,
{
type Item = SealedBlock;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// If all blocks have been consumed, then return `None`.
if let Some(blocks) = &mut this.blocks {
if blocks.is_empty() {
// Stream is finished
return Poll::Ready(None)
}

// return the next block if it's ready - the vec should be in ascending order since it
// is reversed right after it is received from the future, so we can just pop() the
// elements to return them from the stream in descending order
return Poll::Ready(blocks.pop())
}

// poll the inner future if the blocks are not yet ready
let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx));

// the blocks are returned in descending order, reverse the list so we can just pop() the
// vec to yield the next block in the stream
blocks.reverse();

// pop the first block from the vec as the first stream element and store the rest
let first_result = blocks.pop();

// if the inner future is ready, then we can return the blocks
this.blocks = Some(blocks);

// return the first block
Poll::Ready(first_result)
}
}

/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
/// futures until they return responses. It will return either the header or body result, depending
/// on which future successfully returned.
Expand Down Expand Up @@ -742,7 +678,6 @@ enum RangeResponseResult {
mod tests {
use super::*;
use crate::test_utils::TestFullBlockClient;
use futures::StreamExt;
use std::ops::Range;

#[tokio::test]
Expand Down Expand Up @@ -808,43 +743,6 @@ mod tests {
}
}

#[tokio::test]
async fn download_full_block_range_stream() {
let client = TestFullBlockClient::default();
let (header, body) = insert_headers_into_client(&client, 0..50);
let client = FullBlockClient::test_client(client);

let future = client.get_full_block_range(header.hash(), 1);
let mut stream = FullBlockRangeStream::from(future);

// ensure only block in the stream is the one we requested
let received = stream.next().await.expect("response should not be None");
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));

// stream should be done now
assert_eq!(stream.next().await, None);

// there are 11 total blocks
let future = client.get_full_block_range(header.hash(), 11);
let mut stream = FullBlockRangeStream::from(future);

// check first header
let received = stream.next().await.expect("response should not be None");
let mut curr_number = received.number;
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));

// check the rest of the headers
for _ in 0..10 {
let received = stream.next().await.expect("response should not be None");
assert_eq!(received.number, curr_number - 1);
curr_number = received.number;
}

// ensure stream is done
let received = stream.next().await;
assert!(received.is_none());
}

#[tokio::test]
async fn download_full_block_range_over_soft_limit() {
// default soft limit is 20, so we will request 50 blocks
Expand Down
Loading