Skip to content

Commit

Permalink
[REFACTOR] HashMapListener::onAdded/onRemoved
Browse files Browse the repository at this point in the history
Previously, this interface was called each time an item was changed. This
required listeners to understand performance implications of multiple
adds or removes in a short time span.

Instead, give each listener the ability to process a list of added or
removed entrys which can help them avoid performance issues.

This patch is just a refactor. Each listener is called once for each
ProtectedStorageEntry. Future patches will change this.
  • Loading branch information
julianknutsen committed Nov 18, 2019
1 parent 3ea0a16 commit 78211a6
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 77 deletions.
32 changes: 19 additions & 13 deletions core/src/main/java/bisq/core/alert/AlertManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.math.BigInteger;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService,
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javafx.collections.ObservableList;
import javafx.collections.transformation.FilteredList;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -130,17 +131,21 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,13 +134,17 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataRemoved(protectedStorageEntry);
});
}


Expand Down
31 changes: 18 additions & 13 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.math.BigInteger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -133,23 +134,27 @@ public void onAllServicesInitialized() {

p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1);
}
}
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
});
}
});
}
Expand Down
37 changes: 21 additions & 16 deletions core/src/main/java/bisq/core/offer/OfferBookService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.File;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -87,26 +88,30 @@ public OfferBookService(P2PService p2PService,

p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
});
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
});
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.math.BigInteger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,18 +132,22 @@ public DisputeAgentManager(KeyRing keyRing,
public void onAllServicesInitialized() {
disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
removeAcceptedDisputeAgentFromUser(data);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
removeAcceptedDisputeAgentFromUser(protectedStorageEntry);
}
});
}
});

Expand Down
13 changes: 7 additions & 6 deletions p2p/src/main/java/bisq/network/p2p/P2PService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.security.PublicKey;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -432,15 +433,15 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry protectedStorageEntry) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
}

public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) { }

///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import java.util.Collection;

public interface HashMapChangedListener {
void onAdded(ProtectedStorageEntry data);
void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries);

@SuppressWarnings("UnusedParameters")
void onRemoved(ProtectedStorageEntry data);
void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries);

// We process all expired entries after a delay (60 s) after onBootstrapComplete.
// We notify listeners of start and completion so they can optimize to only update after batch processing is done.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.time.Clock;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -409,7 +410,7 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn

// This is an updated entry. Record it and signal listeners.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));

// Record the updated sequence number and persist it. Higher delay so we can batch more items.
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
Expand Down Expand Up @@ -643,7 +644,7 @@ public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener

private void removeFromMapAndDataStore(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) {
map.remove(hashOfPayload);
hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onRemoved(Collections.singletonList(protectedStorageEntry)));

ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {
Expand Down
9 changes: 5 additions & 4 deletions p2p/src/test/java/bisq/network/p2p/storage/TestState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.security.PublicKey;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
Expand Down Expand Up @@ -174,7 +175,7 @@ void verifyProtectedStorageAdd(SavedTestState beforeState,
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
}

verify(this.hashMapChangedListener).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener).onAdded(Collections.singletonList(protectedStorageEntry));

final ArgumentCaptor<BroadcastMessage> captor = ArgumentCaptor.forClass(BroadcastMessage.class);
verify(this.mockBroadcaster).broadcast(captor.capture(), any(NodeAddress.class),
Expand All @@ -192,7 +193,7 @@ void verifyProtectedStorageAdd(SavedTestState beforeState,
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());

// Internal state didn't change... nothing should be notified
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
Expand All @@ -216,7 +217,7 @@ void verifyProtectedStorageRemove(SavedTestState beforeState,
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}

verify(this.hashMapChangedListener).onRemoved(protectedStorageEntry);
verify(this.hashMapChangedListener).onRemoved(Collections.singletonList(protectedStorageEntry));

if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
Expand All @@ -232,7 +233,7 @@ void verifyProtectedStorageRemove(SavedTestState beforeState,
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));

verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
Expand Down

0 comments on commit 78211a6

Please sign in to comment.