Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Limit ibft msg queues #704

Merged
merged 24 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4fa3284
move evicting map into it's own class
jframe Jan 29, 2019
fa03a32
use evicting map for future msgs in ibftController and roundChangeMan…
jframe Jan 29, 2019
593b7ca
add generics on evict map eviction method
jframe Jan 29, 2019
bb6532e
config for message buffer size
jframe Jan 29, 2019
968e758
test that MaxSizeEvictingMap evicts records when at capacity
jframe Jan 29, 2019
cf639a7
rename map type
jframe Jan 30, 2019
11be71c
Merge branch 'master' into limit_ibft_msg_queues
jframe Jan 30, 2019
aa3e2d6
use SizeLimitedMap in the UniqueMessageMulticaster
jframe Jan 30, 2019
16467b8
set event queue limit from the ibft message buffer size config
jframe Jan 30, 2019
d385a15
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 1, 2019
3695c51
revert map and queue changes to future messages and round change and …
jframe Feb 1, 2019
901a331
spotless
jframe Feb 1, 2019
13e3388
fix test context after config changes
jframe Feb 1, 2019
363f507
update config names
jframe Feb 1, 2019
b64829e
update config names
jframe Feb 3, 2019
f5b1e2e
Update gossiped history limit to 1000
jframe Feb 3, 2019
4afd709
Update gossiped history limit to 1000 - test
jframe Feb 3, 2019
08b79fe
comment on why default gossipped history limit was chosen
jframe Feb 4, 2019
97af876
update field names to match new config names
jframe Feb 4, 2019
be0bd99
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 4, 2019
d32c128
Merge branch 'master' into limit_ibft_msg_queues
CjHare Feb 6, 2019
96a3ab7
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
14ab492
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
66a5279
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class IbftConfigOptions {
private static final long DEFAULT_EPOCH_LENGTH = 30_000;
private static final int DEFAULT_BLOCK_PERIOD_SECONDS = 1;
private static final int DEFAULT_ROUND_EXPIRY_SECONDS = 1;
private static final int DEFAULT_MESSAGE_BUFFER_SIZE = 10_000;

private final JsonObject ibftConfigRoot;

Expand All @@ -39,4 +40,8 @@ public int getBlockPeriodSeconds() {
public int getRequestTimeoutSeconds() {
return ibftConfigRoot.getInteger("requesttimeoutseconds", DEFAULT_ROUND_EXPIRY_SECONDS);
}

public int getMessageBufferSize() {
return ibftConfigRoot.getInteger("messagebuffersize", DEFAULT_MESSAGE_BUFFER_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_EPOCH_LENGTH = 30_000;
private static final int EXPECTED_DEFAULT_BLOCK_PERIOD = 1;
private static final int EXPECTED_DEFAULT_REQUEST_TIMEOUT = 1;
private static final int EXPECTED_DEFAULT_MESSAGE_BUFFER_SIZE = 10_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these oddly named? I.e. what does "EXPECTED_" indicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are bit oddly named I guess. The "EXPECTED_" indicates this expected values for the assertion. These values are used to assert the default values are correct in the tests. It's existing test code so I just followed the existing pattern.


@Test
public void shouldGetEpochLengthFromConfig() {
Expand Down Expand Up @@ -80,6 +81,24 @@ public void shouldGetDefaultRequestTimeoutFromDefaultConfig() {
.isEqualTo(EXPECTED_DEFAULT_REQUEST_TIMEOUT);
}

@Test
public void shouldGetMessageBufferSizeFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("MessageBufferSize", 5_000));
assertThat(config.getMessageBufferSize()).isEqualTo(5_000);
}

@Test
public void shouldFallbackToDefaultMessageBufferSize() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getMessageBufferSize()).isEqualTo(EXPECTED_DEFAULT_MESSAGE_BUFFER_SIZE);
}

@Test
public void shouldGetDefaultMessageBufferSizeFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getMessageBufferSize())
.isEqualTo(EXPECTED_DEFAULT_MESSAGE_BUFFER_SIZE);
}

private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public EventMultiplexer getEventMultiplexer() {
public static final int EPOCH_LENGTH = 10_000;
public static final int BLOCK_TIMER_SEC = 3;
public static final int ROUND_TIMER_SEC = 12;
public static final int MESSAGE_BUFFER_SIZE = 10_000;

private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue();
Expand Down Expand Up @@ -158,7 +159,8 @@ public TestContext build() {
// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubValidatorMulticaster multicaster = new StubValidatorMulticaster();

final UniqueMessageMulticaster uniqueMulticaster = new UniqueMessageMulticaster(multicaster);
final UniqueMessageMulticaster uniqueMulticaster =
new UniqueMessageMulticaster(multicaster, MESSAGE_BUFFER_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should Multicaster be updated as part of this change? Or is that a separate problem?
Is the size of the multicaster buffer related to the incoming event queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multicaster should be updated to have a limited size buffer. Would be better to have a seperate config option though.


final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);

Expand Down Expand Up @@ -301,7 +303,8 @@ private static ControllerAndState createControllerAndFinalState(
finalState,
new IbftRoundFactory(
finalState, protocolContext, protocolSchedule, minedBlockObservers),
messageValidatorFactory),
messageValidatorFactory,
MESSAGE_BUFFER_SIZE),
gossiper,
new HashMap<>());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import java.util.LinkedHashMap;
import java.util.Map.Entry;

/**
* Map that is limited to a specified size and will evict oldest entries when the size limit is
* reached.
*/
public class SizeLimitedMap<K, V> extends LinkedHashMap<K, V> {
private final int maxEntries;

public SizeLimitedMap(final int maxEntries) {
this.maxEntries = maxEntries;
}

@Override
protected boolean removeEldestEntry(final Entry<K, V> ignored) {
return size() > maxEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,33 @@
*/
package tech.pegasys.pantheon.consensus.ibft;

import static java.util.Collections.newSetFromMap;

import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

public class UniqueMessageMulticaster implements ValidatorMulticaster {

private final int maxSeenMessages;
private final ValidatorMulticaster multicaster;

UniqueMessageMulticaster(final ValidatorMulticaster multicaster, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.multicaster = multicaster;
}
private final Set<Integer> seenMessages;

/**
* Constructor that attaches gossip logic to a set of multicaster
*
* @param multicaster Network connections to the remote validators
* @param maxSeenMessages Maximum messages to track as seen
*/
public UniqueMessageMulticaster(final ValidatorMulticaster multicaster) {
this(multicaster, 10_000);
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final int maxSeenMessages) {
this.multicaster = multicaster;
// Set that starts evicting members when it hits capacity
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(maxSeenMessages));
}

// Set that starts evicting members when it hits capacity
private final Set<Integer> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Integer, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Integer, Boolean> eldest) {
return size() > maxSeenMessages;
}
});

@Override
public void send(final MessageData message) {
send(message, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.SizeLimitedMap;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMessageTransmitter;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
Expand All @@ -43,7 +44,6 @@
import java.util.function.Consumer;
import java.util.function.Function;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -64,7 +64,7 @@ public class IbftBlockHeightManager implements BlockHeightManager {
private final BlockTimer blockTimer;
private final IbftMessageTransmitter transmitter;
private final MessageFactory messageFactory;
private final Map<Integer, RoundState> futureRoundStateBuffer = Maps.newHashMap();
private final Map<Integer, RoundState> futureRoundStateBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to the incoming event queue size?

private final NewRoundMessageValidator newRoundMessageValidator;
private final Clock clock;
private final Function<ConsensusRoundIdentifier, RoundState> roundStateCreator;
Expand All @@ -80,7 +80,8 @@ public IbftBlockHeightManager(
final RoundChangeManager roundChangeManager,
final IbftRoundFactory ibftRoundFactory,
final Clock clock,
final MessageValidatorFactory messageValidatorFactory) {
final MessageValidatorFactory messageValidatorFactory,
final int maxFutureRoundStateBufferSize) {
this.parentHeader = parentHeader;
this.roundFactory = ibftRoundFactory;
this.roundTimer = finalState.getRoundTimer();
Expand All @@ -91,6 +92,7 @@ public IbftBlockHeightManager(
this.roundChangeManager = roundChangeManager;
this.finalState = finalState;

futureRoundStateBuffer = new SizeLimitedMap<>(maxFutureRoundStateBufferSize);
newRoundMessageValidator = messageValidatorFactory.createNewRoundValidator(parentHeader);

roundStateCreator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ public class IbftBlockHeightManagerFactory {
private final IbftRoundFactory roundFactory;
private final IbftFinalState finalState;
private final MessageValidatorFactory messageValidatorFactory;
private final int messageBufferSize;

public IbftBlockHeightManagerFactory(
final IbftFinalState finalState,
final IbftRoundFactory roundFactory,
final MessageValidatorFactory messageValidatorFactory) {
final MessageValidatorFactory messageValidatorFactory,
final int messageBufferSize) {
this.roundFactory = roundFactory;
this.finalState = finalState;
this.messageValidatorFactory = messageValidatorFactory;
this.messageBufferSize = messageBufferSize;
}

public BlockHeightManager create(final BlockHeader parentHeader) {
Expand All @@ -52,6 +55,7 @@ private BlockHeightManager createFullBlockHeightManager(final BlockHeader parent
messageValidatorFactory.createRoundChangeMessageValidator(parentHeader)),
roundFactory,
finalState.getClock(),
messageValidatorFactory);
messageValidatorFactory,
messageBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.SizeLimitedMap;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
Expand All @@ -40,7 +41,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -58,8 +58,14 @@ public IbftController(
final Blockchain blockchain,
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final IbftGossip gossiper) {
this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, gossiper, Maps.newHashMap());
final IbftGossip gossiper,
final int messageBufferSize) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
new SizeLimitedMap<>(messageBufferSize));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.SizeLimitedMap;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangeCertificate;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
Expand Down Expand Up @@ -44,7 +45,8 @@ public static class RoundChangeStatus {

// Store only 1 round change per round per validator
@VisibleForTesting
final Map<Address, SignedData<RoundChangePayload>> receivedMessages = Maps.newLinkedHashMap();
final Map<Address, SignedData<RoundChangePayload>> receivedMessages =
new SizeLimitedMap<>(10_000);

private boolean actioned = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Test;

public class SizeLimitedMapTest {

@Test
public void evictMessageRecordAtCapacity() {
SizeLimitedMap<String, Boolean> map = new SizeLimitedMap<>(5);

map.put("message1", true);
assertThat(map).hasSize(1);

// add messages so map is at capacity
for (int i = 2; i <= 5; i++) {
map.put("message" + i, true);
}
assertThat(map).hasSize(5);

map.put("message6", false);
assertThat(map).hasSize(5);
assertThat(map.keySet()).doesNotContain("message1");
assertThat(map.keySet()).contains("message2", "message3", "message4", "message5", "message6");

map.put("message7", true);
assertThat(map).hasSize(5);
assertThat(map.keySet()).doesNotContain("message1", "message2");
assertThat(map.keySet()).contains("message3", "message4", "message5", "message6", "message7");
}
}
Loading