Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified iterating through txset history entries #4655

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 21 additions & 45 deletions src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "catchup/ApplyLedgerWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryManager.h"
#include "history/HistoryUtils.h"
#include "historywork/Progress.h"
#include "ledger/CheckpointRange.h"
#include "ledger/LedgerManager.h"
Expand Down Expand Up @@ -139,36 +140,22 @@ ApplyCheckpointWork::getCurrentTxSet()
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;

// Check mTxHistoryEntry prior to loading next history entry.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
do
auto foundEntry = getHistoryEntryForLedger<TransactionHistoryEntry>(
mTxIn, mTxHistoryEntry, seq);

if (foundEntry)
{
if (mTxHistoryEntry.ledgerSeq < seq)
{
CLOG_DEBUG(History, "Skipping txset for ledger {}",
mTxHistoryEntry.ledgerSeq);
}
else if (mTxHistoryEntry.ledgerSeq > seq)
CLOG_DEBUG(History, "Loaded txset for ledger {}", seq);
if (mTxHistoryEntry.ext.v() == 0)
{
break;
return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet);
}
else
{
releaseAssert(mTxHistoryEntry.ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txset for ledger {}", seq);
if (mTxHistoryEntry.ext.v() == 0)
{
return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet);
}
else
{
return TxSetXDRFrame::makeFromWire(
mTxHistoryEntry.ext.generalizedTxSet());
}
return TxSetXDRFrame::makeFromWire(
mTxHistoryEntry.ext.generalizedTxSet());
}
} while (mTxIn && mTxIn.readOne(mTxHistoryEntry));
}

CLOG_DEBUG(History, "Using empty txset for ledger {}", seq);
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
Expand All @@ -181,29 +168,18 @@ ApplyCheckpointWork::getCurrentTxResultSet()
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry))
releaseAssertOrThrow(mTxHistoryResultEntry);

if (mTxResultIn)
{
if (mTxHistoryResultEntry)
auto foundEntry =
getHistoryEntryForLedger<TransactionHistoryResultEntry>(
*mTxResultIn, *mTxHistoryResultEntry, seq);

if (foundEntry)
{
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
Expand Down
60 changes: 60 additions & 0 deletions src/history/HistoryUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "history/HistoryUtils.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger.h"
#include <Tracy.hpp>

namespace stellar
{

template <typename T>
bool
getHistoryEntryForLedger(XDRInputFileStream& stream, T& currentEntry,
uint32_t targetLedger,
std::function<void(uint32_t ledgerSeq)> validateFn)
{
ZoneScoped;

auto readNextWithValidation = [&]() {
auto res = stream.readOne(currentEntry);
if (res && validateFn)
{
validateFn(currentEntry.ledgerSeq);
}
return res;
};

do
{
if (currentEntry.ledgerSeq < targetLedger)
{
CLOG_DEBUG(History, "Advancing past txhistory entry for ledger {}",
currentEntry.ledgerSeq);
}
else if (currentEntry.ledgerSeq > targetLedger)
{
// No entry for this ledger
break;
}
else
{
// Found the entry for our target ledger
return true;
}
} while (stream && readNextWithValidation());

return false;
}

template bool getHistoryEntryForLedger<TransactionHistoryEntry>(
XDRInputFileStream& stream, TransactionHistoryEntry& currentEntry,
uint32_t targetLedger, std::function<void(uint32_t ledgerSeq)> validateFn);

template bool getHistoryEntryForLedger<TransactionHistoryResultEntry>(
XDRInputFileStream& stream, TransactionHistoryResultEntry& currentEntry,
uint32_t targetLedger, std::function<void(uint32_t ledgerSeq)> validateFn);
}
26 changes: 26 additions & 0 deletions src/history/HistoryUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#pragma once

#include <cstdint>
#include <functional>

namespace stellar
{

class XDRInputFileStream;

// This function centralizes the logic for iterating through
// history archive tx set entries (TransactionHistoryEntry,
// TransactionHistoryResultEntry) that may have gaps. Reads an entry from the
// stream into currentEntry until eof or an entry is found with ledgerSeq >=
// targetLedger. Returns true if targetLedger found (currentEntry will be set to
// the target ledger). Otherwise returns false, where currentEntry is the last
// entry read from the stream.
template <typename T>
bool getHistoryEntryForLedger(
XDRInputFileStream& stream, T& currentEntry, uint32_t targetLedger,
std::function<void(uint32_t ledgerSeq)> validateFn = nullptr);
}
54 changes: 19 additions & 35 deletions src/historywork/VerifyTxResultsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "historywork/VerifyTxResultsWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryUtils.h"
#include "ledger/LedgerManager.h"
#include "main/ErrorMessages.h"
#include "util/FileSystemException.h"
Expand Down Expand Up @@ -144,47 +145,30 @@ VerifyTxResultsWork::getCurrentTxResultSet(uint32_t ledger)
TransactionHistoryResultEntry trs;
trs.ledgerSeq = ledger;

auto readNextWithValidation = [&]() {
auto res = mResIn.readOne(mTxResultEntry);
if (res)
auto validateFn = [this](uint32_t readLedger) {
auto low = HistoryManager::firstLedgerInCheckpointContaining(
mCheckpoint, mApp.getConfig());
if (readLedger > mCheckpoint || readLedger < low)
{
auto readLedger = mTxResultEntry.ledgerSeq;
auto low = HistoryManager::firstLedgerInCheckpointContaining(
mCheckpoint, mApp.getConfig());
if (readLedger > mCheckpoint || readLedger < low)
{
throw std::runtime_error("Results outside of checkpoint range");
}
throw std::runtime_error("Results outside of checkpoint range");
}

if (readLedger <= mLastSeenLedger)
{
throw std::runtime_error("Malformed or duplicate results: "
"ledgers must be strictly increasing");
}
mLastSeenLedger = readLedger;
if (readLedger <= mLastSeenLedger)
{
throw std::runtime_error("Malformed or duplicate results: "
"ledgers must be strictly increasing");
}
return res;
mLastSeenLedger = readLedger;
};

do
auto foundEntry = getHistoryEntryForLedger<TransactionHistoryResultEntry>(
mResIn, mTxResultEntry, ledger, validateFn);

if (foundEntry)
{
if (mTxResultEntry.ledgerSeq < ledger)
{
CLOG_DEBUG(History, "Processed tx results for ledger {}",
mTxResultEntry.ledgerSeq);
}
else if (mTxResultEntry.ledgerSeq > ledger)
{
// No tx results in this ledger
break;
}
else
{
CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger);
trs.txResultSet = mTxResultEntry.txResultSet;
return trs;
}
} while (mResIn && readNextWithValidation());
CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger);
trs.txResultSet = mTxResultEntry.txResultSet;
}

return trs;
}
Expand Down