Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Implement max message size rather than limiting with fixed number of transactions #1271

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ffadf39
Implement max message size rather then cap with fixed number of trans…
AbdelStark Apr 12, 2019
f9de190
fix final variables
AbdelStark Apr 12, 2019
2dfd4ac
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 12, 2019
1357795
Update AbstractRLPOutput.java
AbdelStark Apr 12, 2019
b9052b5
pr discussion
AbdelStark Apr 12, 2019
1d07a42
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 12, 2019
2260bfe
SpotlessApply
AbdelStark Apr 12, 2019
16f21dc
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
2cf5cbd
fix PR discussion
AbdelStark Apr 15, 2019
058cea9
Update LimitedTransactionsMessages.java
AbdelStark Apr 15, 2019
30020e0
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
3eccfb7
fix PR discussion
AbdelStark Apr 15, 2019
e7bf11e
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
40372be
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 15, 2019
398c3ab
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
0073f1e
Update AbstractRLPOutput.java
AbdelStark Apr 15, 2019
40e15d6
Merge branch 'feature/pan-1005-tx-limit-size-send-peers' of https://g…
AbdelStark Apr 15, 2019
bdefa72
Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/…
mbaxter Apr 15, 2019
07924dc
Update Transaction.java
AbdelStark Apr 15, 2019
4d059cd
Merge branch 'feature/pan-1005-tx-limit-size-send-peers' of https://g…
AbdelStark Apr 15, 2019
c8ca937
fix PR discussion
AbdelStark Apr 15, 2019
0f94204
fix PR discussion
AbdelStark Apr 15, 2019
8899e78
Update BlockDataGenerator.java
AbdelStark Apr 15, 2019
4066a17
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
2112b83
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
52cd58e
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
61579c8
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
038e4d4
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
6f3e08f
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
268e065
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;

public class DefaultTransactionMessageLimiter implements TransactionsMessageLimiter {

private static final int LIMIT = 1048576;

@Override
public boolean isLastTransactionToIncludeWithoutExceedingLimit(final BytesValueRLPOutput tmp) {
return tmp.uncheckedEncodedSize() >= LIMIT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import tech.pegasys.pantheon.ethereum.core.Transaction;

import java.util.Set;

public final class LimitedTransactionsMessages {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved

private final TransactionsMessage transactionsMessage;
private final Set<Transaction> includedTransactions;

public LimitedTransactionsMessages(
final TransactionsMessage transactionsMessage, final Set<Transaction> includedTransactions) {
this.transactionsMessage = transactionsMessage;
this.includedTransactions = includedTransactions;
}

public final TransactionsMessage getTransactionsMessage() {
return transactionsMessage;
}

public final Set<Transaction> getIncludedTransactions() {
return includedTransactions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Function;

public class TransactionsMessage extends AbstractMessageData {
Expand All @@ -47,7 +49,24 @@ public static TransactionsMessage create(final Iterable<Transaction> transaction
return new TransactionsMessage(tmp.encoded());
}

private TransactionsMessage(final BytesValue data) {
public static LimitedTransactionsMessages createLimited(
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Iterable<Transaction> transactions, final TransactionsMessageLimiter limiter) {
final Set<Transaction> includedTransactions = new HashSet<>();
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
for (final Transaction transaction : transactions) {
transaction.writeTo(tmp);
includedTransactions.add(transaction);
if (limiter.isLastTransactionToIncludeWithoutExceedingLimit(tmp)) {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
break;
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
}
}
tmp.endList();
return new LimitedTransactionsMessages(
new TransactionsMessage(tmp.encoded()), includedTransactions);
}

TransactionsMessage(final BytesValue data) {
super(data);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;

public interface TransactionsMessageLimiter {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved

boolean isLastTransactionToIncludeWithoutExceedingLimit(final BytesValueRLPOutput tmp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tech.pegasys.pantheon.ethereum.core.PendingTransactions;
import tech.pegasys.pantheon.ethereum.core.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.DefaultTransactionMessageLimiter;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
Expand All @@ -36,7 +37,7 @@ public static TransactionPool createTransactionPool(

final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
final TransactionsMessageSender transactionsMessageSender =
new TransactionsMessageSender(transactionTracker);
new TransactionsMessageSender(transactionTracker, new DefaultTransactionMessageLimiter());

final TransactionPool transactionPool =
new TransactionPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,42 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.util.stream.Collectors.toSet;

import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.messages.LimitedTransactionsMessages;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessageLimiter;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;

import java.util.Set;
import java.util.stream.StreamSupport;

class TransactionsMessageSender {

private static final int MAX_BATCH_SIZE = 10;
private final PeerTransactionTracker transactionTracker;
private final TransactionsMessageLimiter transactionsMessageLimiter;

public TransactionsMessageSender(final PeerTransactionTracker transactionTracker) {
public TransactionsMessageSender(
final PeerTransactionTracker transactionTracker,
final TransactionsMessageLimiter transactionsMessageLimiter) {
this.transactionTracker = transactionTracker;
this.transactionsMessageLimiter = transactionsMessageLimiter;
}

public void sendTransactionsToPeers() {
transactionTracker.getEthPeersWithUnsentTransactions().forEach(this::sendTransactionsToPeer);
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true)
.parallel()
.forEach(this::sendTransactionsToPeer);
}

private void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
while (!allTxToSend.isEmpty()) {
final Set<Transaction> subsetToSend =
allTxToSend.stream().limit(MAX_BATCH_SIZE).collect(toSet());
allTxToSend.removeAll(subsetToSend);
final LimitedTransactionsMessages limitedTransactionsMessages =
TransactionsMessage.createLimited(allTxToSend, transactionsMessageLimiter);
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) You could also have LimitedTransactionsMessages.createLimited return a stream of TransactionMessages, then you don't need the extra removeAll bookkeeping and the extra memory for the included tx's list.

try {
peer.send(TransactionsMessage.create(subsetToSend));
peer.send(limitedTransactionsMessages.getTransactionsMessage());
} catch (final PeerNotConnected e) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
import static com.google.common.collect.Sets.newHashSet;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessageLimiter;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.List;
Expand All @@ -40,14 +43,17 @@ public class TransactionsMessageSenderTest {

private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);
private final TransactionsMessageLimiter transactionsMessageLimiter =
mock(TransactionsMessageLimiter.class);

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
private final Transaction transaction3 = generator.transaction();

private final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
private final TransactionsMessageSender messageSender =
new TransactionsMessageSender(transactionTracker);
new TransactionsMessageSender(transactionTracker, transactionsMessageLimiter);

@Test
public void shouldSendTransactionsToEachPeer() throws Exception {
Expand All @@ -63,14 +69,16 @@ public void shouldSendTransactionsToEachPeer() throws Exception {
}

@Test
public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> fifteenTransactions =
IntStream.range(0, 15).mapToObj(number -> generator.transaction()).collect(toSet());
fifteenTransactions.forEach(
public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> threeTransactions =
IntStream.range(0, 3).mapToObj(number -> generator.transaction()).collect(toSet());
threeTransactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));

when(transactionsMessageLimiter.isLastTransactionToIncludeWithoutExceedingLimit(any()))
.thenReturn(false)
.thenReturn(true);
messageSender.sendTransactionsToPeers();

final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
ArgumentCaptor.forClass(MessageData.class);
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture());
Expand All @@ -82,10 +90,10 @@ public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> firstBatch = getTransactionsFromMessage(sentMessages.get(0));
final Set<Transaction> secondBatch = getTransactionsFromMessage(sentMessages.get(1));

assertThat(firstBatch).hasSize(10);
assertThat(secondBatch).hasSize(5);
assertThat(firstBatch).hasSize(2);
assertThat(secondBatch).hasSize(1);

assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(fifteenTransactions);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(threeTransactions);
}

private MessageData transactionsMessageContaining(final Transaction... transactions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public int encodedSize() {
return payloadSizes[0];
}

public int uncheckedEncodedSize() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
return payloadSizes[0];
}

/**
* Write the rlp encoded value to the provided {@link MutableBytesValue}
*
Expand Down