Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Rename IBFT networking classes (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
rain-on authored Jan 16, 2019
1 parent d75f6a3 commit ba6e6f4
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.consensus.ibft.support;

import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

Expand All @@ -21,27 +21,26 @@

import com.google.common.collect.Lists;

public class StubIbftMulticaster implements IbftMulticaster {
public class StubValidatorMulticaster implements ValidatorMulticaster {

private final List<ValidatorPeer> validatorNodes = Lists.newArrayList();

public StubIbftMulticaster() {}
public StubValidatorMulticaster() {}

public void addNetworkPeers(final Collection<ValidatorPeer> nodes) {
validatorNodes.addAll(nodes);
}

@Override
public void multicastToValidators(final MessageData message) {
public void send(final MessageData message) {
validatorNodes.forEach(peer -> peer.handleReceivedMessage(message));
}

@Override
public void multicastToValidatorsExcept(
final MessageData message, final Collection<Address> exceptAddresses) {
public void send(final MessageData message, final Collection<Address> blackList) {
validatorNodes
.stream()
.filter(peer -> !exceptAddresses.contains(peer.getNodeAddress()))
.filter(peer -> !blackList.contains(peer.getNodeAddress()))
.forEach(peer -> peer.handleReceivedMessage(message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ public static TestContext createTestEnvironment(
final KeyPair nodeKeys = networkNodes.getLocalNode().getNodeKeyPair();

// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubIbftMulticaster stubbedNetworkPeers = new StubIbftMulticaster();
final StubValidatorMulticaster stubbedMulticaster = new StubValidatorMulticaster();

final ControllerAndState controllerAndState =
createControllerAndFinalState(blockChain, stubbedNetworkPeers, nodeKeys, clock);
createControllerAndFinalState(blockChain, stubbedMulticaster, nodeKeys, clock);

// Add each networkNode to the Multicaster (such that each can receive msgs from local node).
// NOTE: the remotePeers needs to be ordered based on Address (as this is used to determine
Expand All @@ -150,7 +150,7 @@ public static TestContext createTestEnvironment(
},
LinkedHashMap::new));

stubbedNetworkPeers.addNetworkPeers(remotePeers.values());
stubbedMulticaster.addNetworkPeers(remotePeers.values());

return new TestContext(
remotePeers,
Expand Down Expand Up @@ -186,7 +186,7 @@ private static Block createGenesisBlock(final Set<Address> validators) {

private static ControllerAndState createControllerAndFinalState(
final MutableBlockchain blockChain,
final StubIbftMulticaster stubbedNetworkPeers,
final StubValidatorMulticaster stubbedMulticaster,
final KeyPair nodeKeys,
final Clock clock) {

Expand Down Expand Up @@ -242,7 +242,7 @@ private static ControllerAndState createControllerAndFinalState(
nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector,
stubbedNetworkPeers,
stubbedMulticaster,
new RoundTimer(
ibftEventQueue, ROUND_TIMER_SEC * 1000, Executors.newScheduledThreadPool(1)),
new BlockTimer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import tech.pegasys.pantheon.consensus.ibft.messagedata.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address;
Expand All @@ -35,7 +35,7 @@

/** Class responsible for rebroadcasting IBFT messages to known validators */
public class IbftGossip {
private final IbftMulticaster peers;
private final ValidatorMulticaster multicaster;

// Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data
private final int maxSeenMessages;
Expand All @@ -50,18 +50,18 @@ protected boolean removeEldestEntry(final Map.Entry<Signature, Boolean> eldest)
}
});

IbftGossip(final IbftMulticaster peers, final int maxSeenMessages) {
IbftGossip(final ValidatorMulticaster multicaster, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.peers = peers;
this.multicaster = multicaster;
}

/**
* Constructor that attaches gossip logic to a set of peers
* Constructor that attaches gossip logic to a set of multicaster
*
* @param peers The always up to date set of connected peers that understand IBFT
* @param multicaster Network connections to the remote validators
*/
public IbftGossip(final IbftMulticaster peers) {
this(peers, 10_000);
public IbftGossip(final ValidatorMulticaster multicaster) {
this(multicaster, 10_000);
}

/**
Expand Down Expand Up @@ -100,7 +100,7 @@ public boolean gossipMessage(final Message message) {
final List<Address> excludeAddressesList =
Lists.newArrayList(
message.getConnection().getPeer().getAddress(), signedData.getSender());
peers.multicastToValidatorsExcept(messageData, excludeAddressesList);
multicaster.send(messageData, excludeAddressesList);
seenMessages.add(signature);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;
package tech.pegasys.pantheon.consensus.ibft.network;

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.messagedata.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
Expand All @@ -37,10 +36,10 @@
public class IbftMessageTransmitter {

private final MessageFactory messageFactory;
private final IbftMulticaster multicaster;
private final ValidatorMulticaster multicaster;

public IbftMessageTransmitter(
final MessageFactory messageFactory, final IbftMulticaster multicaster) {
final MessageFactory messageFactory, final ValidatorMulticaster multicaster) {
this.messageFactory = messageFactory;
this.multicaster = multicaster;
}
Expand All @@ -51,7 +50,7 @@ public void multicastProposal(final ConsensusRoundIdentifier roundIdentifier, fi

final ProposalMessageData message = ProposalMessageData.create(signedPayload);

multicaster.multicastToValidators(message);
multicaster.send(message);
}

public void multicastPrepare(final ConsensusRoundIdentifier roundIdentifier, final Hash digest) {
Expand All @@ -60,7 +59,7 @@ public void multicastPrepare(final ConsensusRoundIdentifier roundIdentifier, fin

final PrepareMessageData message = PrepareMessageData.create(signedPayload);

multicaster.multicastToValidators(message);
multicaster.send(message);
}

public void multicastCommit(
Expand All @@ -72,7 +71,7 @@ public void multicastCommit(

final CommitMessageData message = CommitMessageData.create(signedPayload);

multicaster.multicastToValidators(message);
multicaster.send(message);
}

public void multicastRoundChange(
Expand All @@ -84,7 +83,7 @@ public void multicastRoundChange(

final RoundChangeMessageData message = RoundChangeMessageData.create(signedPayload);

multicaster.multicastToValidators(message);
multicaster.send(message);
}

public void multicastNewRound(
Expand All @@ -98,6 +97,6 @@ public void multicastNewRound(

final NewRoundMessageData message = NewRoundMessageData.create(signedPayload);

multicaster.multicastToValidators(message);
multicaster.send(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.network;

import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;

public interface PeerConnectionTracker {

void add(final PeerConnection newConnection);

void remove(final PeerConnection removedConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

import java.util.Collection;

public interface IbftMulticaster {
public interface ValidatorMulticaster {

void multicastToValidators(final MessageData message);
void send(final MessageData message);

void multicastToValidatorsExcept(
final MessageData message, final Collection<Address> exceptAddresses);
void send(final MessageData message, final Collection<Address> blackList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class IbftNetworkPeers implements IbftMulticaster {
/**
* Responsible for tracking the network peers which have a connection to this node, then
* multicasting packets to ONLY the peers which have been identified as being validators.
*/
public class ValidatorPeers implements ValidatorMulticaster, PeerConnectionTracker {

private static final Logger LOG = LogManager.getLogger();

Expand All @@ -36,34 +40,35 @@ public class IbftNetworkPeers implements IbftMulticaster {
private final Map<Address, PeerConnection> peerConnections = Maps.newConcurrentMap();
private final ValidatorProvider validatorProvider;

public IbftNetworkPeers(final ValidatorProvider validatorProvider) {
public ValidatorPeers(final ValidatorProvider validatorProvider) {
this.validatorProvider = validatorProvider;
}

public void peerAdded(final PeerConnection newConnection) {
@Override
public void add(final PeerConnection newConnection) {
final Address peerAddress = newConnection.getPeer().getAddress();
peerConnections.put(peerAddress, newConnection);
}

public void peerRemoved(final PeerConnection removedConnection) {
@Override
public void remove(final PeerConnection removedConnection) {
final Address peerAddress = removedConnection.getPeer().getAddress();
peerConnections.remove(peerAddress);
}

@Override
public void multicastToValidators(final MessageData message) {
public void send(final MessageData message) {
final Collection<Address> validators = validatorProvider.getValidators();
sendMessageToSpecificAddresses(validators, message);
}

@Override
public void multicastToValidatorsExcept(
final MessageData message, final Collection<Address> exceptAddresses) {
public void send(final MessageData message, final Collection<Address> blackList) {
final Collection<Address> includedValidators =
validatorProvider
.getValidators()
.stream()
.filter(a -> !exceptAddresses.contains(a))
.filter(a -> !blackList.contains(a))
.collect(Collectors.toSet());
sendMessageToSpecificAddresses(includedValidators, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents;
import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers;
import tech.pegasys.pantheon.consensus.ibft.network.PeerConnectionTracker;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
Expand All @@ -32,15 +32,16 @@ public class IbftProtocolManager implements ProtocolManager {
private final IbftEventQueue ibftEventQueue;

private final Logger LOG = LogManager.getLogger();
private final IbftNetworkPeers peers;
private final PeerConnectionTracker peers;

/**
* Constructor for the ibft protocol manager
*
* @param ibftEventQueue Entry point into the ibft event processor
* @param peers iBFT network peers
* @param peers Used to track all connected IBFT peers.
*/
public IbftProtocolManager(final IbftEventQueue ibftEventQueue, final IbftNetworkPeers peers) {
public IbftProtocolManager(
final IbftEventQueue ibftEventQueue, final PeerConnectionTracker peers) {
this.ibftEventQueue = ibftEventQueue;
this.peers = peers;
}
Expand Down Expand Up @@ -84,15 +85,15 @@ public void processMessage(final Capability cap, final Message message) {

@Override
public void handleNewConnection(final PeerConnection peerConnection) {
peers.peerAdded(peerConnection);
peers.add(peerConnection);
}

@Override
public void handleDisconnect(
final PeerConnection peerConnection,
final DisconnectReason disconnectReason,
final boolean initiatedByPeer) {
peers.peerRemoved(peerConnection);
peers.remove(peerConnection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMessageTransmitter;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public IbftController(
ibftFinalState,
ibftBlockHeightManagerFactory,
Maps.newHashMap(),
new IbftGossip(ibftFinalState.getPeers()));
new IbftGossip(ibftFinalState.getValidatorMulticaster()));
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit ba6e6f4

Please sign in to comment.