Skip to content

Commit

Permalink
[PAN-1339] Send local transactions to new peers (PegaSysEng#1253)
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish authored and notlesh committed Apr 24, 2019
1 parent 9788119 commit 79f06a7
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

class PeerTransactionTracker implements DisconnectCallback {
public class PeerTransactionTracker implements DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import tech.pegasys.pantheon.ethereum.core.AccountFilter;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
Expand Down Expand Up @@ -57,18 +59,31 @@ public class TransactionPool implements BlockAddedObserver {
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final SyncState syncState;
private Optional<AccountFilter> accountFilter = Optional.empty();
private final PeerTransactionTracker peerTransactionTracker;

public TransactionPool(
final PendingTransactions pendingTransactions,
final ProtocolSchedule<?> protocolSchedule,
final ProtocolContext<?> protocolContext,
final TransactionBatchAddedListener transactionBatchAddedListener,
final SyncState syncState) {
final SyncState syncState,
final EthContext ethContext,
final PeerTransactionTracker peerTransactionTracker) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.syncState = syncState;
this.peerTransactionTracker = peerTransactionTracker;

ethContext.getEthPeers().subscribeConnect(this::handleConnect);
}

private void handleConnect(final EthPeer peer) {
List<Transaction> localTransactions = getLocalTransactions();
for (Transaction transaction : localTransactions) {
peerTransactionTracker.addToPeerSendQueue(peer, transaction);
}
}

public List<Transaction> getLocalTransactions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public static TransactionPool createTransactionPool(
protocolSchedule,
protocolContext,
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
syncState);
syncState,
ethContext,
transactionTracker);

final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -59,7 +64,9 @@
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -91,19 +98,31 @@ public class TransactionPoolTest {
private TransactionPool transactionPool;
private long genesisBlockGasLimit;
private final AccountFilter accountFilter = mock(AccountFilter.class);
private SyncState syncState;
private EthContext ethContext;
private PeerTransactionTracker peerTransactionTracker;

@Before
public void setUp() {
blockchain = executionContext.getBlockchain();
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator);
genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit();
SyncState syncState = mock(SyncState.class);
syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);

ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
peerTransactionTracker = mock(PeerTransactionTracker.class);
transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
blockchain.observeBlockAdded(transactionPool);
}

Expand Down Expand Up @@ -434,7 +453,13 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() {
when(syncState.isInSync(anyLong())).thenReturn(false);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
Expand Down Expand Up @@ -462,12 +487,6 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() {

@Test
public void shouldAllowRemoteTransactionsWhenInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
Expand All @@ -491,6 +510,40 @@ public void shouldAllowRemoteTransactionsWhenInSync() {
assertTransactionPending(transaction3);
}

@Test
public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthContext ethContext = ethProtocolManager.ethContext();
PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transactionLocal = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transactionRemote = builder.nonce(2).createTransaction(KEY_PAIR1);
when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
any(Transaction.class), nullable(Account.class), eq(true)))
.thenReturn(valid());

transactionPool.addLocalTransaction(transactionLocal);
transactionPool.addRemoteTransactions(Collections.singletonList(transactionRemote));

RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager);

Set<Transaction> transactionsToSendToPeer =
peerTransactionTracker.claimTransactionsToSendToPeer(peer.getEthPeer());

assertThat(transactionsToSendToPeer).containsExactly(transactionLocal);
}

private void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.hash())).contains(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
Expand All @@ -30,7 +31,10 @@
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PeerTransactionTracker;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
Expand Down Expand Up @@ -85,13 +89,21 @@ public void setUp() {
final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
blockchain = executionContext.getBlockchain();
final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();

PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

transactionPool =
new TransactionPool(
transactions,
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
syncState);
syncState,
ethContext,
peerTransactionTracker);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive());
filterManager =
Expand Down

0 comments on commit 79f06a7

Please sign in to comment.