Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2128] Add getNodeData to EthPeer to enable requesting node data #589

Merged
merged 3 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class EthPeer {
private final RequestManager headersRequestManager = new RequestManager(this);
private final RequestManager bodiesRequestManager = new RequestManager(this);
private final RequestManager receiptsRequestManager = new RequestManager(this);
private final RequestManager nodeDataRequestManager = new RequestManager(this);

private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
Expand Down Expand Up @@ -120,6 +122,8 @@ public ResponseStream send(final MessageData messageData) throws PeerNotConnecte
return sendBodiesRequest(messageData);
case EthPV63.GET_RECEIPTS:
return sendReceiptsRequest(messageData);
case EthPV63.GET_NODE_DATA:
return sendNodeDataRequest(messageData);
default:
connection.sendForProtocol(protocolName, messageData);
return null;
Expand Down Expand Up @@ -168,6 +172,17 @@ private ResponseStream sendReceiptsRequest(final MessageData messageData)
() -> connection.sendForProtocol(protocolName, messageData));
}

public ResponseStream getNodeData(final Iterable<Hash> nodeHashes) throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendNodeDataRequest(message);
}

private ResponseStream sendNodeDataRequest(final MessageData messageData)
throws PeerNotConnected {
return nodeDataRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
}

boolean validateReceivedMessage(final EthMessage message) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) {
Expand All @@ -189,6 +204,12 @@ boolean validateReceivedMessage(final EthMessage message) {
return false;
}
break;
case EthPV63.NODE_DATA:
if (nodeDataRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited node data received.");
return false;
}
break;
default:
// Nothing to do
}
Expand All @@ -215,6 +236,10 @@ void dispatch(final EthMessage message) {
reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS);
receiptsRequestManager.dispatchResponse(message);
break;
case EthPV63.NODE_DATA:
reputation.resetTimeoutCount(EthPV63.GET_NODE_DATA);
nodeDataRequestManager.dispatchResponse(message);
break;
default:
// Nothing to do
}
Expand All @@ -228,6 +253,7 @@ void handleDisconnect() {
headersRequestManager.close();
bodiesRequestManager.close();
receiptsRequestManager.close();
nodeDataRequestManager.close();
disconnectCallbacks.forEach(callback -> callback.onDisconnect(this));
}

Expand Down Expand Up @@ -294,7 +320,8 @@ public void registerHeight(final Hash blockHash, final long height) {
public int outstandingRequests() {
return headersRequestManager.outstandingRequests()
+ bodiesRequestManager.outstandingRequests()
+ receiptsRequestManager.outstandingRequests();
+ receiptsRequestManager.outstandingRequests()
+ nodeDataRequestManager.outstandingRequests();
}

public BytesValue nodeId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
Expand All @@ -20,14 +22,13 @@
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -43,33 +44,39 @@ public void getHeadersStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getHeadersByHash(gen.hash(), 5, 0, false);
final MessageData targetMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
final MessageData otherMessage =
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()));
BlockHeadersMessage.create(asList(gen.header(), gen.header()));
final MessageData otherMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getBodiesStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getBodies(Arrays.asList(gen.hash(), gen.hash()));
final MessageData targetMessage =
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()));
final MessageData otherMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
(peer) -> peer.getBodies(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getReceiptsStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getReceipts(Arrays.asList(gen.hash(), gen.hash()));
(peer) -> peer.getReceipts(asList(gen.hash(), gen.hash()));
final MessageData targetMessage =
ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block())));
final MessageData otherMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
ReceiptsMessage.create(singletonList(gen.receipts(gen.block())));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getNodeDataStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getNodeData(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = NodeDataMessage.create(singletonList(gen.bytesValue()));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}
Expand All @@ -88,7 +95,7 @@ public void closeStreamsOnPeerDisconnect() throws PeerNotConnected {
});
// Bodies stream
final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
peer.getBodies(Arrays.asList(gen.hash(), gen.hash()))
peer.getBodies(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
Expand All @@ -97,37 +104,47 @@ public void closeStreamsOnPeerDisconnect() throws PeerNotConnected {
});
// Receipts stream
final AtomicInteger receiptsClosedCount = new AtomicInteger(0);
peer.getReceipts(Arrays.asList(gen.hash(), gen.hash()))
peer.getReceipts(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
receiptsClosedCount.incrementAndGet();
}
});
// NodeData stream
final AtomicInteger nodeDataClosedCount = new AtomicInteger(0);
peer.getNodeData(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
nodeDataClosedCount.incrementAndGet();
}
});

// Sanity check
assertThat(headersClosedCount.get()).isEqualTo(0);
assertThat(bodiesClosedCount.get()).isEqualTo(0);
assertThat(receiptsClosedCount.get()).isEqualTo(0);
assertThat(nodeDataClosedCount.get()).isEqualTo(0);

// Disconnect and check
peer.handleDisconnect();
assertThat(headersClosedCount.get()).isEqualTo(1);
assertThat(bodiesClosedCount.get()).isEqualTo(1);
assertThat(receiptsClosedCount.get()).isEqualTo(1);
assertThat(nodeDataClosedCount.get()).isEqualTo(1);
}

@Test
public void listenForMultipleStreams() throws PeerNotConnected {
// Setup peer and messages
final EthPeer peer = createPeer();
final EthMessage headersMessage =
new EthMessage(peer, BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())));
new EthMessage(peer, BlockHeadersMessage.create(asList(gen.header(), gen.header())));
final EthMessage bodiesMessage =
new EthMessage(peer, BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body())));
new EthMessage(peer, BlockBodiesMessage.create(asList(gen.body(), gen.body())));
final EthMessage otherMessage =
new EthMessage(
peer, ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block()))));
new EthMessage(peer, ReceiptsMessage.create(singletonList(gen.receipts(gen.block()))));

// Set up stream for headers
final AtomicInteger headersMessageCount = new AtomicInteger(0);
Expand All @@ -147,7 +164,7 @@ public void listenForMultipleStreams() throws PeerNotConnected {
final AtomicInteger bodiesMessageCount = new AtomicInteger(0);
final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
final ResponseStream bodiesStream =
peer.getBodies(Arrays.asList(gen.hash(), gen.hash()))
peer.getBodies(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
Expand Down Expand Up @@ -265,7 +282,7 @@ private void messageStream(
}

private EthPeer createPeer() {
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
final Set<Capability> caps = new HashSet<>(singletonList(EthProtocol.ETH63));
final PeerConnection peerConnection = new MockPeerConnection(caps);
final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady);
Expand Down