From 5b34c420372a4b7157a797ab2e4c4f6f1bd5b4a2 Mon Sep 17 00:00:00 2001 From: Garand Tyson Date: Thu, 27 Feb 2025 11:34:16 -0800 Subject: [PATCH] Unified iterating through txset history entries --- src/catchup/ApplyCheckpointWork.cpp | 66 ++++++++----------------- src/history/HistoryUtils.cpp | 60 ++++++++++++++++++++++ src/history/HistoryUtils.h | 26 ++++++++++ src/historywork/VerifyTxResultsWork.cpp | 54 +++++++------------- 4 files changed, 126 insertions(+), 80 deletions(-) create mode 100644 src/history/HistoryUtils.cpp create mode 100644 src/history/HistoryUtils.h diff --git a/src/catchup/ApplyCheckpointWork.cpp b/src/catchup/ApplyCheckpointWork.cpp index ff9a370301..55baac03cb 100644 --- a/src/catchup/ApplyCheckpointWork.cpp +++ b/src/catchup/ApplyCheckpointWork.cpp @@ -8,6 +8,7 @@ #include "catchup/ApplyLedgerWork.h" #include "history/FileTransferInfo.h" #include "history/HistoryManager.h" +#include "history/HistoryUtils.h" #include "historywork/Progress.h" #include "ledger/CheckpointRange.h" #include "ledger/LedgerManager.h" @@ -139,36 +140,22 @@ ApplyCheckpointWork::getCurrentTxSet() auto& lm = mApp.getLedgerManager(); auto seq = lm.getLastClosedLedgerNum() + 1; - // Check mTxHistoryEntry prior to loading next history entry. - // This order is important because it accounts for ledger "gaps" - // in the history archives (which are caused by ledgers with empty tx - // sets, as those are not uploaded). - do + auto foundEntry = getHistoryEntryForLedger( + mTxIn, mTxHistoryEntry, seq); + + if (foundEntry) { - if (mTxHistoryEntry.ledgerSeq < seq) - { - CLOG_DEBUG(History, "Skipping txset for ledger {}", - mTxHistoryEntry.ledgerSeq); - } - else if (mTxHistoryEntry.ledgerSeq > seq) + CLOG_DEBUG(History, "Loaded txset for ledger {}", seq); + if (mTxHistoryEntry.ext.v() == 0) { - break; + return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet); } else { - releaseAssert(mTxHistoryEntry.ledgerSeq == seq); - CLOG_DEBUG(History, "Loaded txset for ledger {}", seq); - if (mTxHistoryEntry.ext.v() == 0) - { - return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet); - } - else - { - return TxSetXDRFrame::makeFromWire( - mTxHistoryEntry.ext.generalizedTxSet()); - } + return TxSetXDRFrame::makeFromWire( + mTxHistoryEntry.ext.generalizedTxSet()); } - } while (mTxIn && mTxIn.readOne(mTxHistoryEntry)); + } CLOG_DEBUG(History, "Using empty txset for ledger {}", seq); return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader()); @@ -181,29 +168,18 @@ ApplyCheckpointWork::getCurrentTxResultSet() ZoneScoped; auto& lm = mApp.getLedgerManager(); auto seq = lm.getLastClosedLedgerNum() + 1; - // Check mTxResultSet prior to loading next result set. - // This order is important because it accounts for ledger "gaps" - // in the history archives (which are caused by ledgers with empty tx - // sets, as those are not uploaded). - while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry)) + releaseAssertOrThrow(mTxHistoryResultEntry); + + if (mTxResultIn) { - if (mTxHistoryResultEntry) + auto foundEntry = + getHistoryEntryForLedger( + *mTxResultIn, *mTxHistoryResultEntry, seq); + + if (foundEntry) { - if (mTxHistoryResultEntry->ledgerSeq < seq) - { - CLOG_DEBUG(History, "Advancing past txresultset for ledger {}", - mTxHistoryResultEntry->ledgerSeq); - } - else if (mTxHistoryResultEntry->ledgerSeq > seq) - { - break; - } - else - { - releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq); - CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq); - return std::make_optional(mTxHistoryResultEntry->txResultSet); - } + CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq); + return std::make_optional(mTxHistoryResultEntry->txResultSet); } } CLOG_DEBUG(History, "No txresultset for ledger {}", seq); diff --git a/src/history/HistoryUtils.cpp b/src/history/HistoryUtils.cpp new file mode 100644 index 0000000000..a30b87042e --- /dev/null +++ b/src/history/HistoryUtils.cpp @@ -0,0 +1,60 @@ +// Copyright 2025 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "history/HistoryUtils.h" +#include "util/Logging.h" +#include "util/XDRStream.h" +#include "xdr/Stellar-ledger.h" +#include + +namespace stellar +{ + +template +bool +getHistoryEntryForLedger(XDRInputFileStream& stream, T& currentEntry, + uint32_t targetLedger, + std::function validateFn) +{ + ZoneScoped; + + auto readNextWithValidation = [&]() { + auto res = stream.readOne(currentEntry); + if (res && validateFn) + { + validateFn(currentEntry.ledgerSeq); + } + return res; + }; + + do + { + if (currentEntry.ledgerSeq < targetLedger) + { + CLOG_DEBUG(History, "Advancing past txhistory entry for ledger {}", + currentEntry.ledgerSeq); + } + else if (currentEntry.ledgerSeq > targetLedger) + { + // No entry for this ledger + break; + } + else + { + // Found the entry for our target ledger + return true; + } + } while (stream && readNextWithValidation()); + + return false; +} + +template bool getHistoryEntryForLedger( + XDRInputFileStream& stream, TransactionHistoryEntry& currentEntry, + uint32_t targetLedger, std::function validateFn); + +template bool getHistoryEntryForLedger( + XDRInputFileStream& stream, TransactionHistoryResultEntry& currentEntry, + uint32_t targetLedger, std::function validateFn); +} \ No newline at end of file diff --git a/src/history/HistoryUtils.h b/src/history/HistoryUtils.h new file mode 100644 index 0000000000..429a9f3d54 --- /dev/null +++ b/src/history/HistoryUtils.h @@ -0,0 +1,26 @@ +// Copyright 2025 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#pragma once + +#include +#include + +namespace stellar +{ + +class XDRInputFileStream; + +// This function centralizes the logic for iterating through +// history archive tx set entries (TransactionHistoryEntry, +// TransactionHistoryResultEntry) that may have gaps. Reads an entry from the +// stream into currentEntry until eof or an entry is found with ledgerSeq >= +// targetLedger. Returns true if targetLedger found (currentEntry will be set to +// the target ledger). Otherwise returns false, where currentEntry is the last +// entry read from the stream. +template +bool getHistoryEntryForLedger( + XDRInputFileStream& stream, T& currentEntry, uint32_t targetLedger, + std::function validateFn = nullptr); +} \ No newline at end of file diff --git a/src/historywork/VerifyTxResultsWork.cpp b/src/historywork/VerifyTxResultsWork.cpp index 14c4dc0891..d6ea669be5 100644 --- a/src/historywork/VerifyTxResultsWork.cpp +++ b/src/historywork/VerifyTxResultsWork.cpp @@ -4,6 +4,7 @@ #include "historywork/VerifyTxResultsWork.h" #include "history/FileTransferInfo.h" +#include "history/HistoryUtils.h" #include "ledger/LedgerManager.h" #include "main/ErrorMessages.h" #include "util/FileSystemException.h" @@ -144,47 +145,30 @@ VerifyTxResultsWork::getCurrentTxResultSet(uint32_t ledger) TransactionHistoryResultEntry trs; trs.ledgerSeq = ledger; - auto readNextWithValidation = [&]() { - auto res = mResIn.readOne(mTxResultEntry); - if (res) + auto validateFn = [this](uint32_t readLedger) { + auto low = HistoryManager::firstLedgerInCheckpointContaining( + mCheckpoint, mApp.getConfig()); + if (readLedger > mCheckpoint || readLedger < low) { - auto readLedger = mTxResultEntry.ledgerSeq; - auto low = HistoryManager::firstLedgerInCheckpointContaining( - mCheckpoint, mApp.getConfig()); - if (readLedger > mCheckpoint || readLedger < low) - { - throw std::runtime_error("Results outside of checkpoint range"); - } + throw std::runtime_error("Results outside of checkpoint range"); + } - if (readLedger <= mLastSeenLedger) - { - throw std::runtime_error("Malformed or duplicate results: " - "ledgers must be strictly increasing"); - } - mLastSeenLedger = readLedger; + if (readLedger <= mLastSeenLedger) + { + throw std::runtime_error("Malformed or duplicate results: " + "ledgers must be strictly increasing"); } - return res; + mLastSeenLedger = readLedger; }; - do + auto foundEntry = getHistoryEntryForLedger( + mResIn, mTxResultEntry, ledger, validateFn); + + if (foundEntry) { - if (mTxResultEntry.ledgerSeq < ledger) - { - CLOG_DEBUG(History, "Processed tx results for ledger {}", - mTxResultEntry.ledgerSeq); - } - else if (mTxResultEntry.ledgerSeq > ledger) - { - // No tx results in this ledger - break; - } - else - { - CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger); - trs.txResultSet = mTxResultEntry.txResultSet; - return trs; - } - } while (mResIn && readNextWithValidation()); + CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger); + trs.txResultSet = mTxResultEntry.txResultSet; + } return trs; }