Skip to content

Commit

Permalink
Update removeExpiredEntries to remove all items in a batch
Browse files Browse the repository at this point in the history
This will cause HashMapChangedListeners to receive just one onRemoved()
call for the expire work instead of multiple onRemoved() calls for each
item.

This required a bit of updating for the remove validation in tests so
that it correctly compares onRemoved with multiple items.
  • Loading branch information
julianknutsen committed Nov 19, 2019
1 parent 489b25a commit eae641e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 32 deletions.
10 changes: 4 additions & 6 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ void removeExpiredEntries() {
// Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying
// about start and end of iteration.
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted);
toRemoveList.forEach(mapEntry -> {
ProtectedStorageEntry protectedStorageEntry = mapEntry.getValue();
ByteArray payloadHash = mapEntry.getKey();

log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry));
removeFromMapAndDataStore(protectedStorageEntry, payloadHash);
toRemoveList.forEach(toRemoveItem -> {
log.debug("We found an expired data entry. We remove the protectedData:\n\t" +
Utilities.toTruncatedString(toRemoveItem.getValue()));
});
removeFromMapAndDataStore(toRemoveList);
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted);

if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
Expand Down Expand Up @@ -149,17 +150,21 @@ public void removeExpiredEntries_ExpiresExpiredPersistableExpirableEntries() thr
public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchAlgorithmException {
final int initialClockIncrement = 5;

ArrayList<ProtectedStorageEntry> expectedRemoves = new ArrayList<>();

// Add 4 entries to our sequence number map that will be purged
KeyPair purgedOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload purgedProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(purgedOwnerKeys.getPublic(), 0);
ProtectedStorageEntry purgedProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(purgedProtectedStoragePayload, purgedOwnerKeys);
expectedRemoves.add(purgedProtectedStorageEntry);

Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));

for (int i = 0; i < MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE - 1; ++i) {
KeyPair ownerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload protectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(ownerKeys.getPublic(), 0);
ProtectedStorageEntry tmpEntry = testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys);
expectedRemoves.add(tmpEntry);
Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(tmpEntry, TestState.getTestNodeAddress(), null, true));
}

Expand All @@ -171,24 +176,25 @@ public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchA
KeyPair keepOwnerKeys = TestUtils.generateKeyPair();
ProtectedStoragePayload keepProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(keepOwnerKeys.getPublic(), 0);
ProtectedStorageEntry keepProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(keepProtectedStoragePayload, keepOwnerKeys);
expectedRemoves.add(keepProtectedStorageEntry);

Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, true));

// P2PDataStorage::PURGE_AGE_DAYS == 10 days
// Advance time past it so they will be valid purge targets
this.testState.clockFake.increment(TimeUnit.DAYS.toMillis(P2PDataStorage.PURGE_AGE_DAYS + 1 - initialClockIncrement));

// The first entry (11 days old) should be purged
// The first 4 entries (11 days old) should be purged from the SequenceNumberMap
SavedTestState beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
this.testState.mockedStorage.removeExpiredEntries();
this.testState.verifyProtectedStorageRemove(beforeState, purgedProtectedStorageEntry, true, false, false, false);
this.testState.verifyProtectedStorageRemove(beforeState, expectedRemoves, true, false, false, false);

// Which means that an addition of a purged entry should succeed.
beforeState = this.testState.saveTestState(purgedProtectedStorageEntry);
Assert.assertTrue(this.testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, purgedProtectedStorageEntry, true, false);

// The second entry (5 days old) should still exist which means trying to add it again should fail.
// The last entry (5 days old) should still exist in the SequenceNumberMap which means trying to add it again should fail.
beforeState = this.testState.saveTestState(keepProtectedStorageEntry);
Assert.assertFalse(this.testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, false));
this.testState.verifyProtectedStorageAdd(beforeState, keepProtectedStorageEntry, false, false);
Expand Down
79 changes: 56 additions & 23 deletions p2p/src/test/java/bisq/network/p2p/storage/TestState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@

import java.security.PublicKey;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
Expand Down Expand Up @@ -205,38 +208,68 @@ void verifyProtectedStorageRemove(SavedTestState beforeState,
boolean expectedBroadcastOnStateChange,
boolean expectedSeqNrWriteOnStateChange,
boolean expectedIsDataOwner) {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());

verifyProtectedStorageRemove(beforeState, Collections.singletonList(protectedStorageEntry),
expectedStateChange, expectedBroadcastOnStateChange,
expectedSeqNrWriteOnStateChange, expectedIsDataOwner);
}

void verifyProtectedStorageRemove(SavedTestState beforeState,
Collection<ProtectedStorageEntry> protectedStorageEntries,
boolean expectedStateChange,
boolean expectedBroadcastOnStateChange,
boolean expectedSeqNrWriteOnStateChange,
boolean expectedIsDataOwner) {

// The default matcher expects orders to stay the same. So, create a custom matcher function since
// we don't care about the order.
if (expectedStateChange) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));
final ArgumentCaptor<Collection<ProtectedStorageEntry>> argument = ArgumentCaptor.forClass(Collection.class);
verify(this.hashMapChangedListener).onRemoved(argument.capture());

if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash));
Set<ProtectedStorageEntry> actual = new HashSet<>(argument.getValue());
Set<ProtectedStorageEntry> expected = new HashSet<>(protectedStorageEntries);

verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
// Ensure we didn't remove duplicates
Assert.assertEquals(protectedStorageEntries.size(), expected.size());
Assert.assertEquals(argument.getValue().size(), actual.size());
Assert.assertEquals(expected, actual);
} else {
verify(this.hashMapChangedListener, never()).onRemoved(any());
}

verify(this.hashMapChangedListener).onRemoved(Collections.singletonList(protectedStorageEntry));
protectedStorageEntries.forEach(protectedStorageEntry -> {
P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());
P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload());

if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
if (expectedStateChange) {
Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash));

if (expectedBroadcastOnStateChange) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
else
verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
}
if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) {
Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash));

} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}

verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());

if (expectedBroadcastOnStateChange) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
else
verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner));
}

} else {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));

verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
});
}

void verifyRefreshTTL(SavedTestState beforeState,
Expand Down

0 comments on commit eae641e

Please sign in to comment.