Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Implement max message size rather than limiting with fixed number of transactions #1271

Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ffadf39
Implement max message size rather then cap with fixed number of trans…
AbdelStark Apr 12, 2019
f9de190
fix final variables
AbdelStark Apr 12, 2019
2dfd4ac
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 12, 2019
1357795
Update AbstractRLPOutput.java
AbdelStark Apr 12, 2019
b9052b5
pr discussion
AbdelStark Apr 12, 2019
1d07a42
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 12, 2019
2260bfe
SpotlessApply
AbdelStark Apr 12, 2019
16f21dc
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
2cf5cbd
fix PR discussion
AbdelStark Apr 15, 2019
058cea9
Update LimitedTransactionsMessages.java
AbdelStark Apr 15, 2019
30020e0
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
3eccfb7
fix PR discussion
AbdelStark Apr 15, 2019
e7bf11e
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
40372be
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 15, 2019
398c3ab
Merge remote-tracking branch 'upstream/master' into feature/pan-1005-…
AbdelStark Apr 15, 2019
0073f1e
Update AbstractRLPOutput.java
AbdelStark Apr 15, 2019
40e15d6
Merge branch 'feature/pan-1005-tx-limit-size-send-peers' of https://g…
AbdelStark Apr 15, 2019
bdefa72
Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/…
mbaxter Apr 15, 2019
07924dc
Update Transaction.java
AbdelStark Apr 15, 2019
4d059cd
Merge branch 'feature/pan-1005-tx-limit-size-send-peers' of https://g…
AbdelStark Apr 15, 2019
c8ca937
fix PR discussion
AbdelStark Apr 15, 2019
0f94204
fix PR discussion
AbdelStark Apr 15, 2019
8899e78
Update BlockDataGenerator.java
AbdelStark Apr 15, 2019
4066a17
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
2112b83
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
52cd58e
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
61579c8
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
038e4d4
Update LimitedTransactionsMessagesTest.java
AbdelStark Apr 17, 2019
6f3e08f
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
268e065
Merge branch 'master' into feature/pan-1005-tx-limit-size-send-peers
AbdelStark Apr 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.core;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toSet;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

import tech.pegasys.pantheon.crypto.SECP256K1;
Expand All @@ -31,6 +32,8 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.stream.IntStream;

public class BlockDataGenerator {
private final Random random;
Expand Down Expand Up @@ -241,6 +244,18 @@ public BlockBody body(final BlockOptions options) {
return new BlockBody(options.getTransactions(defaultTxs), ommers);
}

public Transaction transaction(final BytesValue payload) {
return Transaction.builder()
.nonce(positiveLong())
.gasPrice(Wei.wrap(bytes32()))
.gasLimit(positiveLong())
.to(address())
.value(Wei.wrap(bytes32()))
.payload(payload)
.chainId(1)
.signAndBuild(SECP256K1.KeyPair.generate());
}

public Transaction transaction() {
return Transaction.builder()
.nonce(positiveLong())
Expand All @@ -253,6 +268,34 @@ public Transaction transaction() {
.signAndBuild(SECP256K1.KeyPair.generate());
}

public Set<Transaction> transactions(final int n) {
Wei gasPrice = Wei.wrap(bytes32());
long gasLimit = positiveLong();
Address to = address();
Wei value = Wei.wrap(bytes32());
int chainId = 1;
Bytes32 payload = bytes32();
SECP256K1.Signature signature = SECP256K1.sign(payload, SECP256K1.KeyPair.generate());

final Set<Transaction> txs =
IntStream.range(0, n)
.parallel()
.mapToObj(
v ->
new Transaction(
v,
gasPrice,
gasLimit,
Optional.of(to),
value,
signature,
payload,
to,
chainId))
.collect(toSet());
return txs;
}

public TransactionReceipt receipt(final long cumulativeGasUsed) {
return new TransactionReceipt(hash(), cumulativeGasUsed, Arrays.asList(log(), log()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.HashSet;
import java.util.Set;

public final class LimitedTransactionsMessages {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved

private static final int LIMIT = 1048576;

private final TransactionsMessage transactionsMessage;
private final Set<Transaction> includedTransactions;

public LimitedTransactionsMessages(
final TransactionsMessage transactionsMessage, final Set<Transaction> includedTransactions) {
this.transactionsMessage = transactionsMessage;
this.includedTransactions = includedTransactions;
}

public static LimitedTransactionsMessages createLimited(
final Iterable<Transaction> transactions) {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Set<Transaction> includedTransactions = new HashSet<>();
final BytesValueRLPOutput message = new BytesValueRLPOutput();
int messageSize = 0;
message.startList();
for (final Transaction transaction : transactions) {
final BytesValueRLPOutput encodedTransaction = new BytesValueRLPOutput();
transaction.writeTo(encodedTransaction);
BytesValue encodedBytes = encodedTransaction.encoded();
// Break if individual transaction size exceeds limit
if (encodedBytes.size() > LIMIT && (messageSize != 0)) {
break;
}
message.writeRLPUnsafe(encodedBytes);
includedTransactions.add(transaction);
// Check if last transaction to add to the message
messageSize += encodedBytes.size();
if (messageSize > LIMIT) {
break;
}
}
message.endList();
return new LimitedTransactionsMessages(
new TransactionsMessage(message.encoded()), includedTransactions);
}

public final TransactionsMessage getTransactionsMessage() {
return transactionsMessage;
}

public final Set<Transaction> getIncludedTransactions() {
return includedTransactions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static TransactionsMessage create(final Iterable<Transaction> transaction
return new TransactionsMessage(tmp.encoded());
}

private TransactionsMessage(final BytesValue data) {
TransactionsMessage(final BytesValue data) {
super(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.util.stream.Collectors.toSet;

import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.LimitedTransactionsMessages;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;

import java.util.Set;
import java.util.stream.StreamSupport;

class TransactionsMessageSender {

private static final int MAX_BATCH_SIZE = 10;
private final PeerTransactionTracker transactionTracker;

public TransactionsMessageSender(final PeerTransactionTracker transactionTracker) {
this.transactionTracker = transactionTracker;
}

public void sendTransactionsToPeers() {
transactionTracker.getEthPeersWithUnsentTransactions().forEach(this::sendTransactionsToPeer);
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true)
.parallel()
.forEach(this::sendTransactionsToPeer);
}

private void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
while (!allTxToSend.isEmpty()) {
final Set<Transaction> subsetToSend =
allTxToSend.stream().limit(MAX_BATCH_SIZE).collect(toSet());
allTxToSend.removeAll(subsetToSend);
final LimitedTransactionsMessages limitedTransactionsMessages =
LimitedTransactionsMessages.createLimited(allTxToSend);
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) You could also have LimitedTransactionsMessages.createLimited return a stream of TransactionMessages, then you don't need the extra removeAll bookkeeping and the extra memory for the included tx's list.

try {
peer.send(TransactionsMessage.create(subsetToSend));
peer.send(limitedTransactionsMessages.getTransactionsMessage());
} catch (final PeerNotConnected e) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;

import org.junit.Test;

public class LimitedTransactionsMessagesTest {

private static final int LIMIT = 1048576;
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Set<Transaction> sampleTxs = generator.transactions(1);
private final TransactionsMessage sampleTransactionMessages =
TransactionsMessage.create(sampleTxs);
private final LimitedTransactionsMessages sampleLimitedTransactionsMessages =
new LimitedTransactionsMessages(sampleTransactionMessages, sampleTxs);

@Test
public void createLimited() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Set<Transaction> txs = generator.transactions(6000);
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(5219, firstMessage.getIncludedTransactions().size());
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(781, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(781, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
assertTrue(
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you cut this assertion? I think it just ends up being confusing.

(firstMessage.getTransactionsMessage().getSize()
+ secondMessage.getTransactionsMessage().getSize())
< 2 * LIMIT);
}

@Test
public void createLimitedWithFirstTransactionExceedingLimit() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Set<Transaction> txs = new HashSet<>();
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(2, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}

@Test
public void createLimitedWithFirstTransactionExceedingLimit_2() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Set<Transaction> txs = new LinkedHashSet<>();

txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT + 100 - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(2, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages thirdMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, thirdMessage.getIncludedTransactions().size());
txs.removeAll(thirdMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}

@Test
public void getTransactionsMessage() {
assertEquals(
sampleTransactionMessages, sampleLimitedTransactionsMessages.getTransactionsMessage());
}

@Test
public void getIncludedTransactions() {
assertEquals(sampleTxs, sampleLimitedTransactionsMessages.getIncludedTransactions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static com.google.common.collect.Sets.newHashSet;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
Expand All @@ -30,7 +29,6 @@

import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;

import com.google.common.collect.Sets;
import org.junit.Test;
Expand All @@ -40,6 +38,7 @@ public class TransactionsMessageSenderTest {

private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
Expand All @@ -63,14 +62,12 @@ public void shouldSendTransactionsToEachPeer() throws Exception {
}

@Test
public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> fifteenTransactions =
IntStream.range(0, 15).mapToObj(number -> generator.transaction()).collect(toSet());
fifteenTransactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions = generator.transactions(6000);

messageSender.sendTransactionsToPeers();
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));

messageSender.sendTransactionsToPeers();
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
ArgumentCaptor.forClass(MessageData.class);
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture());
Expand All @@ -82,10 +79,10 @@ public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> firstBatch = getTransactionsFromMessage(sentMessages.get(0));
final Set<Transaction> secondBatch = getTransactionsFromMessage(sentMessages.get(1));

assertThat(firstBatch).hasSize(10);
assertThat(secondBatch).hasSize(5);
assertThat(firstBatch).hasSize(5219);
assertThat(secondBatch).hasSize(781);

assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(fifteenTransactions);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions);
}

private MessageData transactionsMessageContaining(final Transaction... transactions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public int encodedSize() {
checkState(stackSize == 1, "A list has been entered (startList()) but not left (endList())");
return payloadSizes[0];
}

/**
* Write the rlp encoded value to the provided {@link MutableBytesValue}
*
Expand Down