Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2605] Add getPeer method to PeerConnection #1383

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,36 @@

import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.enode.EnodeURL;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Strings;

public class MockPeerConnection implements PeerConnection {

private static final PeerSendHandler NOOP_ON_SEND = (cap, msg, conn) -> {};
private static final AtomicLong ID_GENERATOR = new AtomicLong();
private final PeerSendHandler onSend;
private final Set<Capability> caps;
private volatile boolean disconnected = false;
private final Bytes32 nodeId;
private final BytesValue nodeId;
private final Peer peer;
private final PeerInfo peerInfo;

public MockPeerConnection(final Set<Capability> caps, final PeerSendHandler onSend) {
this.caps = caps;
this.onSend = onSend;
this.nodeId = generateUsefulNodeId();
}

private Bytes32 generateUsefulNodeId() {
// EthPeer only shows the first 20 characters of the node ID so add some padding.
return Bytes32.fromHexStringLenient(
"0x" + ID_GENERATOR.incrementAndGet() + Strings.repeat("0", 46));
this.nodeId = Peer.randomId();
this.peer =
DefaultPeer.fromEnodeURL(
EnodeURL.builder().ipAddress("127.0.0.1").nodeId(nodeId).listeningPort(30303).build());
this.peerInfo = new PeerInfo(5, "Mock", new ArrayList<>(caps), 30303, nodeId);
}

public MockPeerConnection(final Set<Capability> caps) {
Expand All @@ -64,9 +63,14 @@ public Set<Capability> getAgreedCapabilities() {
return caps;
}

@Override
public Peer getPeer() {
return peer;
}

@Override
public PeerInfo getPeerInfo() {
return new PeerInfo(5, "Mock", new ArrayList<>(caps), 0, nodeId);
return peerInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ public Set<Capability> getAgreedCapabilities() {
return new HashSet<>(capabilities);
}

@Override
public Peer getPeer() {
return to;
}

@Override
public PeerInfo getPeerInfo() {
return new PeerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.api;

import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
Expand Down Expand Up @@ -64,6 +65,9 @@ default void sendForProtocol(final String protocol, final MessageData message)
send(capability(protocol), message);
}

/** @return A representation of the remote peer this node is connected to. */
Peer getPeer();

/**
* Returns the Peer's Description.
*
Expand Down Expand Up @@ -101,10 +105,6 @@ public PeerNotConnected(final String message) {
}

default EnodeURL getRemoteEnode() {
return EnodeURL.builder()
.nodeId(getPeerInfo().getNodeId())
.listeningPort(getPeerInfo().getPort())
.ipAddress(getRemoteAddress().getAddress())
.build();
return getPeer().getEnodeURL();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ protected void initChannel(final SocketChannel ch) {
+ peer.getId()))),
new HandshakeHandlerOutbound(
keyPair,
peer.getId(),
peer,
subProtocols,
ourPeerInfo,
connectionFuture,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.network.exceptions;

public class UnexpectedPeerConnectionException extends RuntimeException {

public UnexpectedPeerConnectionException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.p2p.network.netty;

import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.ecies.ECIESHandshaker;
Expand Down Expand Up @@ -44,6 +45,8 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte

protected final Handshaker handshaker = new ECIESHandshaker();

// The peer we are expecting to connect to, if such a peer is known
private final Optional<Peer> expectedPeer;
private final PeerInfo ourInfo;

private final Callbacks callbacks;
Expand All @@ -57,12 +60,14 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
AbstractHandshakeHandler(
final List<SubProtocol> subProtocols,
final PeerInfo ourInfo,
final Optional<Peer> expectedPeer,
final CompletableFuture<PeerConnection> connectionFuture,
final Callbacks callbacks,
final PeerConnectionRegistry peerConnectionRegistry,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.subProtocols = subProtocols;
this.ourInfo = ourInfo;
this.expectedPeer = expectedPeer;
this.connectionFuture = connectionFuture;
this.callbacks = callbacks;
this.peerConnectionRegistry = peerConnectionRegistry;
Expand Down Expand Up @@ -108,7 +113,13 @@ protected final void channelRead0(final ChannelHandlerContext ctx, final ByteBuf

final ByteToMessageDecoder deFramer =
new DeFramer(
framer, subProtocols, ourInfo, callbacks, connectionFuture, outboundMessagesCounter);
framer,
subProtocols,
ourInfo,
expectedPeer,
callbacks,
connectionFuture,
outboundMessagesCounter);

ctx.channel()
.pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import tech.pegasys.pantheon.ethereum.p2p.network.exceptions.BreachOfProtocolException;
import tech.pegasys.pantheon.ethereum.p2p.network.exceptions.IncompatiblePeerException;
import tech.pegasys.pantheon.ethereum.p2p.network.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.p2p.network.exceptions.UnexpectedPeerConnectionException;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
Expand All @@ -28,9 +31,13 @@
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.util.enode.EnodeURL;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -52,6 +59,8 @@ final class DeFramer extends ByteToMessageDecoder {

private final Framer framer;
private final PeerInfo ourInfo;
// The peer we are expecting to connect to, if such a peer is known
private final Optional<Peer> expectedPeer;
private final List<SubProtocol> subProtocols;
private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
Expand All @@ -60,12 +69,14 @@ final class DeFramer extends ByteToMessageDecoder {
final Framer framer,
final List<SubProtocol> subProtocols,
final PeerInfo ourInfo,
final Optional<Peer> expectedPeer,
final Callbacks callbacks,
final CompletableFuture<PeerConnection> connectFuture,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.framer = framer;
this.subProtocols = subProtocols;
this.ourInfo = ourInfo;
this.expectedPeer = expectedPeer;
this.connectFuture = connectFuture;
this.callbacks = callbacks;
this.outboundMessagesCounter = outboundMessagesCounter;
Expand Down Expand Up @@ -99,9 +110,22 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
final CapabilityMultiplexer capabilityMultiplexer =
new CapabilityMultiplexer(
subProtocols, ourInfo.getCapabilities(), peerInfo.getCapabilities());
final Peer peer = expectedPeer.orElse(createPeer(peerInfo, ctx));
final PeerConnection connection =
new NettyPeerConnection(
ctx, peerInfo, capabilityMultiplexer, callbacks, outboundMessagesCounter);
ctx, peer, peerInfo, capabilityMultiplexer, callbacks, outboundMessagesCounter);

// Check peer is who we expected
if (expectedPeer.isPresent()
&& !Objects.equals(expectedPeer.get().getId(), peerInfo.getNodeId())) {
String unexpectedMsg =
String.format(
"Expected id %s, but got %s", expectedPeer.get().getId(), peerInfo.getNodeId());
connectFuture.completeExceptionally(new UnexpectedPeerConnectionException(unexpectedMsg));
connection.disconnect(DisconnectReason.UNEXPECTED_ID);
}

// Check that we have shared caps
if (capabilityMultiplexer.getAgreedCapabilities().size() == 0) {
LOG.debug(
"Disconnecting from {} because no capabilities are shared.", peerInfo.getClientId());
Expand Down Expand Up @@ -143,6 +167,16 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
}
}

private Peer createPeer(final PeerInfo peerInfo, final ChannelHandlerContext ctx) {
InetSocketAddress remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress());
return DefaultPeer.fromEnodeURL(
EnodeURL.builder()
.nodeId(peerInfo.getNodeId())
.ipAddress(remoteAddress.getAddress())
.listeningPort(peerInfo.getPort())
.build());
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public HandshakeHandlerInbound(
super(
subProtocols,
ourInfo,
Optional.empty(),
connectionFuture,
callbacks,
peerConnectionRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import tech.pegasys.pantheon.crypto.SECP256K1;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.Handshaker;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.List;
import java.util.Optional;
Expand All @@ -38,7 +38,7 @@ public final class HandshakeHandlerOutbound extends AbstractHandshakeHandler {

public HandshakeHandlerOutbound(
final SECP256K1.KeyPair kp,
final BytesValue peerId,
final Peer peer,
final List<SubProtocol> subProtocols,
final PeerInfo ourInfo,
final CompletableFuture<PeerConnection> connectionFuture,
Expand All @@ -48,11 +48,12 @@ public HandshakeHandlerOutbound(
super(
subProtocols,
ourInfo,
Optional.of(peer),
connectionFuture,
callbacks,
peerConnectionRegistry,
outboundMessagesCounter);
handshaker.prepareInitiator(kp, SECP256K1.PublicKey.create(peerId));
handshaker.prepareInitiator(kp, SECP256K1.PublicKey.create(peer.getId()));
this.first = handshaker.firstMessage();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
Expand Down Expand Up @@ -53,14 +54,17 @@ final class NettyPeerConnection implements PeerConnection {
private final Callbacks callbacks;
private final CapabilityMultiplexer multiplexer;
private final LabelledMetric<Counter> outboundMessagesCounter;
private final Peer peer;

public NettyPeerConnection(
final ChannelHandlerContext ctx,
final Peer peer,
final PeerInfo peerInfo,
final CapabilityMultiplexer multiplexer,
final Callbacks callbacks,
final LabelledMetric<Counter> outboundMessagesCounter) {
this.ctx = ctx;
this.peer = peer;
this.peerInfo = peerInfo;
this.multiplexer = multiplexer;
this.agreedCapabilities = multiplexer.getAgreedCapabilities();
Expand Down Expand Up @@ -117,6 +121,11 @@ public Capability capability(final String protocol) {
return protocolToCapability.get(protocol);
}

@Override
public Peer getPeer() {
return peer;
}

@Override
public Set<Capability> getAgreedCapabilities() {
return agreedCapabilities;
Expand Down
Loading