diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 3ecc098b686..c1f54bb4297 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -316,33 +316,41 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, boolean reBroadcast, boolean checkDate) { log.trace("addPersistableNetworkPayload payload={}", payload); - byte[] hash = payload.getHash(); - if (payload.verifyHashSize()) { - ByteArray hashAsByteArray = new ByteArray(hash); - boolean containsKey = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray); - if (!containsKey || reBroadcast) { - if (!(payload instanceof DateTolerantPayload) || !checkDate || ((DateTolerantPayload) payload).isDateInTolerance(clock)) { - if (!containsKey) { - appendOnlyDataStoreService.put(hashAsByteArray, payload); - appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload)); - } - if (allowBroadcast) - broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null, isDataOwner); - - return true; - } else { - log.warn("Publish date of payload is not matching our current time and outside of our tolerance.\n" + - "Payload={}; now={}", payload.toString(), new Date()); - return false; - } - } else { - log.trace("We have that payload already in our map."); - return false; - } - } else { - log.warn("We got a hash exceeding our permitted size"); + + // Payload hash size does not match expectation for that type of message. + if (!payload.verifyHashSize()) { + log.warn("addPersistableNetworkPayload failed due to unexpected hash size"); return false; } + + ByteArray hashAsByteArray = new ByteArray(payload.getHash()); + boolean payloadHashAlreadyInStore = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray); + + // Store already knows about this payload. Ignore it unless the caller specifically requests a republish. + if (payloadHashAlreadyInStore && !reBroadcast) { + log.trace("addPersistableNetworkPayload failed due to duplicate payload"); + return false; + } + + // DateTolerantPayloads are only checked for tolerance from the onMessage handler (checkDate == true). If not in + // tolerance, ignore it. + if (checkDate && payload instanceof DateTolerantPayload && !((DateTolerantPayload) payload).isDateInTolerance((clock))) { + log.warn("addPersistableNetworkPayload failed due to payload time outside tolerance.\n" + + "Payload={}; now={}", payload.toString(), new Date()); + return false; + } + + // Add the payload and publish the state update to the appendOnlyDataStoreListeners + if (!payloadHashAlreadyInStore) { + appendOnlyDataStoreService.put(hashAsByteArray, payload); + appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload)); + } + + // Broadcast the payload if requested by caller + if (allowBroadcast) + broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null, isDataOwner); + + return true; } // When we receive initial data we skip several checks to improve performance. We requested only missing entries so we @@ -382,50 +390,50 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn return false; } - boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload); - boolean result = sequenceNrValid && - checkPublicKeys(protectedStorageEntry, true) - && checkSignature(protectedStorageEntry); + // TODO: Combine with hasSequenceNrIncreased check, but keep existing behavior for now + if(!isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) + return false; + + // Verify the ProtectedStorageEntry is well formed and valid for the add operation + if (!checkPublicKeys(protectedStorageEntry, true) || !checkSignature(protectedStorageEntry)) + return false; boolean containsKey = map.containsKey(hashOfPayload); - if (containsKey) { - result = result && checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload); - } - // printData("before add"); - if (result) { - boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload); + // If we have already seen an Entry with the same hash, verify the new Entry has the same owner + if (containsKey && !checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload)) + return false; - if (!containsKey || hasSequenceNrIncreased) { - // At startup we don't have the item so we store it. At updates of the seq nr we store as well. - map.put(hashOfPayload, protectedStorageEntry); - hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry)); - // printData("after add"); - } else { - log.trace("We got that version of the data already, so we don't store it."); - } + boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload); - if (hasSequenceNrIncreased) { - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis())); - // We set the delay higher as we might receive a batch of items - sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 2000); + // If we have seen a more recent operation for this payload, we ignore the current one + // TODO: I think we can return false here. All callers use the Client API (addProtectedStorageEntry(getProtectedStorageEntry()) + // leaving only the onMessage() handler which doesn't look at the return value. It makes more intuitive sense that adds() that don't + // change state return false. + if (!hasSequenceNrIncreased) + return true; - if (allowBroadcast) - broadcastProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner); - } else { - log.trace("We got that version of the data already, so we don't broadcast it."); - } + // This is an updated entry. Record it and signal listeners. + map.put(hashOfPayload, protectedStorageEntry); + hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry)); - if (protectedStoragePayload instanceof PersistablePayload) { - ByteArray compactHash = getCompactHashAsByteArray(protectedStoragePayload); - ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry); - if (previous == null) - protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry)); - } - } else { - log.trace("add failed"); + // Record the updated sequence number and persist it. Higher delay so we can batch more items. + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis())); + sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 2000); + + // Optionally, broadcast the add/update depending on the calling environment + if (allowBroadcast) + broadcastProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner); + + // Persist ProtectedStorageEntrys carrying PersistablePayload payloads and signal listeners on changes + if (protectedStoragePayload instanceof PersistablePayload) { + ByteArray compactHash = P2PDataStorage.getCompactHashAsByteArray(protectedStoragePayload); + ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry); + if (previous == null) + protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry)); } - return result; + + return true; } private void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, @@ -439,39 +447,58 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) { ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload()); - if (map.containsKey(hashOfPayload)) { - ProtectedStorageEntry storedData = map.get(hashOfPayload); - int sequenceNumber = refreshTTLMessage.getSequenceNumber(); - if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) { - log.trace("We got that message with that seq nr already from another peer. We ignore that message."); - return true; - } else { - PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey(); - byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr(); - byte[] signature = refreshTTLMessage.getSignature(); - // printData("before refreshTTL"); - if (hasSequenceNrIncreased(sequenceNumber, hashOfPayload) && - checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload) && - checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature)) { - log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100)); - storedData.refreshTTL(); - storedData.updateSequenceNumber(sequenceNumber); - storedData.updateSignature(signature); - printData("after refreshTTL"); - sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); - sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000); - - broadcast(refreshTTLMessage, sender, null, isDataOwner); - return true; - } - - return false; - } - } else { + if (!map.containsKey((hashOfPayload))) { log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing."); + return false; } + + int sequenceNumber = refreshTTLMessage.getSequenceNumber(); + + // If we have seen a more recent operation for this payload, we ignore the current one + // TODO: I think we can return false here. All callers use the Client API (refreshTTL(getRefreshTTLMessage()) which increments the sequence number + // leaving only the onMessage() handler which doesn't look at the return value. It makes more intuitive sense that operations that don't + // change state return false. + if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) { + log.trace("We got that message with that seq nr already from another peer. We ignore that message."); + + return true; + } + + // TODO: Combine with above in future work, but preserve existing behavior for now + if(!hasSequenceNrIncreased(sequenceNumber, hashOfPayload)) + return false; + + ProtectedStorageEntry storedData = map.get(hashOfPayload); + PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey(); + byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr(); + byte[] signature = refreshTTLMessage.getSignature(); + + // Verify the RefreshOfferMessage is well formed and valid for the refresh operation + if (!checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature)) + return false; + + // Verify the Payload owner and the Entry owner for the stored Entry are the same + // TODO: This is also checked in the validation for the original add(), investigate if this can be removed + if (!checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload)) + return false; + + // This is a valid refresh, update the payload for it + log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100)); + storedData.refreshTTL(); + storedData.updateSequenceNumber(sequenceNumber); + storedData.updateSignature(signature); + printData("after refreshTTL"); + + // Record the latest sequence number and persist it + sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); + sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000); + + // Always broadcast refreshes + broadcast(refreshTTLMessage, sender, null, isDataOwner); + + return true; } public boolean remove(ProtectedStorageEntry protectedStorageEntry, @@ -481,32 +508,42 @@ public boolean remove(ProtectedStorageEntry protectedStorageEntry, ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - boolean containsKey = map.containsKey(hashOfPayload); - if (!containsKey) + + // If we don't know about the target of this remove, ignore it + if (!map.containsKey(hashOfPayload)) { log.debug("Remove data ignored as we don't have an entry for that data."); - boolean result = containsKey - && checkPublicKeys(protectedStorageEntry, false) - && isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload) - && checkSignature(protectedStorageEntry) - && checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload); - // printData("before remove"); - if (result) { - doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload); - printData("after remove"); - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis())); - sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); + return false; + } - maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + // If we have seen a more recent operation for this payload, ignore this one + if (!isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) + return false; - broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner); + // Verify the ProtectedStorageEntry is well formed and valid for the remove operation + if (!checkPublicKeys(protectedStorageEntry, false) || !checkSignature(protectedStorageEntry)) + return false; - removeFromProtectedDataStore(protectedStorageEntry); - } else { - log.debug("remove failed"); - } - return result; - } + // If we have already seen an Entry with the same hash, verify the new Entry has the same owner + if (!checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload)) + return false; + + // Valid remove entry, do the remove and signal listeners + doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload); + printData("after remove"); + + // Record the latest sequence number and persist it + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis())); + sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); + + maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + + broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner); + + removeFromProtectedDataStore(protectedStorageEntry); + + return true; +} /** @@ -559,33 +596,46 @@ public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxSt boolean isDataOwner) { ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - boolean containsKey = map.containsKey(hashOfPayload); - if (!containsKey) - log.debug("Remove data ignored as we don't have an entry for that data."); + + if (!map.containsKey(hashOfPayload)) { + log.debug("removeMailboxData failed due to unknown entry"); + + return false; + } int sequenceNumber = protectedMailboxStorageEntry.getSequenceNumber(); + + if (!isSequenceNrValid(sequenceNumber, hashOfPayload)) + return false; + PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey(); - boolean result = containsKey && - isSequenceNrValid(sequenceNumber, hashOfPayload) && - checkPublicKeys(protectedMailboxStorageEntry, false) && - protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey) && // at remove both keys are the same (only receiver is able to remove data) - checkSignature(protectedMailboxStorageEntry) && - checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload); - - // printData("before removeMailboxData"); - if (result) { - doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload); - printData("after removeMailboxData"); - sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); - sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); - - maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); - - broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner); - } else { - log.debug("removeMailboxData failed"); + + if (!checkPublicKeys(protectedMailboxStorageEntry, false) || !checkSignature(protectedMailboxStorageEntry)) + return false; + + // Verify the Entry has the correct receiversPubKey for removal. + if (!protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey)) { + log.debug("Entry receiversPubKey does not match payload owner which is a requirement for removing MailboxStoragePayloads"); + return false; } - return result; + + // If we have already seen an Entry with the same hash, verify the new Entry has the same owner + if (!checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload)) + return false; + + // Valid remove ProtectedMailboxStorageEntry, do the remove and signal listeners + doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload); + printData("after removeMailboxData"); + + // Record the latest sequence number and persist it + sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); + sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); + + maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + + broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner); + + return true; } private void maybeAddToRemoveAddOncePayloads(ProtectedStoragePayload protectedStoragePayload,