Skip to content

Commit

Permalink
Unified iterating through txset history entries (#4655)
Browse files Browse the repository at this point in the history
# Description

Resolves #4567.

# Checklist
- [ ] Reviewed the
[contributing](https://github.com/stellar/stellar-core/blob/master/CONTRIBUTING.md#submitting-changes)
document
- [ ] Rebased on top of master (no merge commits)
- [ ] Ran `clang-format` v8.0.0 (via `make format` or the Visual Studio
extension)
- [ ] Compiles
- [ ] Ran all tests
- [ ] If change impacts performance, include supporting evidence per the
[performance
document](https://github.com/stellar/stellar-core/blob/master/performance-eval/performance-eval.md)
  • Loading branch information
marta-lokhova authored Feb 28, 2025
2 parents 2b2aef9 + 5b34c42 commit a299bf4
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 80 deletions.
66 changes: 21 additions & 45 deletions src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "catchup/ApplyLedgerWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryManager.h"
#include "history/HistoryUtils.h"
#include "historywork/Progress.h"
#include "ledger/CheckpointRange.h"
#include "ledger/LedgerManager.h"
Expand Down Expand Up @@ -139,36 +140,22 @@ ApplyCheckpointWork::getCurrentTxSet()
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;

// Check mTxHistoryEntry prior to loading next history entry.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
do
auto foundEntry = getHistoryEntryForLedger<TransactionHistoryEntry>(
mTxIn, mTxHistoryEntry, seq);

if (foundEntry)
{
if (mTxHistoryEntry.ledgerSeq < seq)
{
CLOG_DEBUG(History, "Skipping txset for ledger {}",
mTxHistoryEntry.ledgerSeq);
}
else if (mTxHistoryEntry.ledgerSeq > seq)
CLOG_DEBUG(History, "Loaded txset for ledger {}", seq);
if (mTxHistoryEntry.ext.v() == 0)
{
break;
return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet);
}
else
{
releaseAssert(mTxHistoryEntry.ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txset for ledger {}", seq);
if (mTxHistoryEntry.ext.v() == 0)
{
return TxSetXDRFrame::makeFromWire(mTxHistoryEntry.txSet);
}
else
{
return TxSetXDRFrame::makeFromWire(
mTxHistoryEntry.ext.generalizedTxSet());
}
return TxSetXDRFrame::makeFromWire(
mTxHistoryEntry.ext.generalizedTxSet());
}
} while (mTxIn && mTxIn.readOne(mTxHistoryEntry));
}

CLOG_DEBUG(History, "Using empty txset for ledger {}", seq);
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
Expand All @@ -181,29 +168,18 @@ ApplyCheckpointWork::getCurrentTxResultSet()
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry))
releaseAssertOrThrow(mTxHistoryResultEntry);

if (mTxResultIn)
{
if (mTxHistoryResultEntry)
auto foundEntry =
getHistoryEntryForLedger<TransactionHistoryResultEntry>(
*mTxResultIn, *mTxHistoryResultEntry, seq);

if (foundEntry)
{
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
Expand Down
60 changes: 60 additions & 0 deletions src/history/HistoryUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "history/HistoryUtils.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger.h"
#include <Tracy.hpp>

namespace stellar
{

template <typename T>
bool
getHistoryEntryForLedger(XDRInputFileStream& stream, T& currentEntry,
uint32_t targetLedger,
std::function<void(uint32_t ledgerSeq)> validateFn)
{
ZoneScoped;

auto readNextWithValidation = [&]() {
auto res = stream.readOne(currentEntry);
if (res && validateFn)
{
validateFn(currentEntry.ledgerSeq);
}
return res;
};

do
{
if (currentEntry.ledgerSeq < targetLedger)
{
CLOG_DEBUG(History, "Advancing past txhistory entry for ledger {}",
currentEntry.ledgerSeq);
}
else if (currentEntry.ledgerSeq > targetLedger)
{
// No entry for this ledger
break;
}
else
{
// Found the entry for our target ledger
return true;
}
} while (stream && readNextWithValidation());

return false;
}

template bool getHistoryEntryForLedger<TransactionHistoryEntry>(
XDRInputFileStream& stream, TransactionHistoryEntry& currentEntry,
uint32_t targetLedger, std::function<void(uint32_t ledgerSeq)> validateFn);

template bool getHistoryEntryForLedger<TransactionHistoryResultEntry>(
XDRInputFileStream& stream, TransactionHistoryResultEntry& currentEntry,
uint32_t targetLedger, std::function<void(uint32_t ledgerSeq)> validateFn);
}
26 changes: 26 additions & 0 deletions src/history/HistoryUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#pragma once

#include <cstdint>
#include <functional>

namespace stellar
{

class XDRInputFileStream;

// This function centralizes the logic for iterating through
// history archive tx set entries (TransactionHistoryEntry,
// TransactionHistoryResultEntry) that may have gaps. Reads an entry from the
// stream into currentEntry until eof or an entry is found with ledgerSeq >=
// targetLedger. Returns true if targetLedger found (currentEntry will be set to
// the target ledger). Otherwise returns false, where currentEntry is the last
// entry read from the stream.
template <typename T>
bool getHistoryEntryForLedger(
XDRInputFileStream& stream, T& currentEntry, uint32_t targetLedger,
std::function<void(uint32_t ledgerSeq)> validateFn = nullptr);
}
54 changes: 19 additions & 35 deletions src/historywork/VerifyTxResultsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "historywork/VerifyTxResultsWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryUtils.h"
#include "ledger/LedgerManager.h"
#include "main/ErrorMessages.h"
#include "util/FileSystemException.h"
Expand Down Expand Up @@ -144,47 +145,30 @@ VerifyTxResultsWork::getCurrentTxResultSet(uint32_t ledger)
TransactionHistoryResultEntry trs;
trs.ledgerSeq = ledger;

auto readNextWithValidation = [&]() {
auto res = mResIn.readOne(mTxResultEntry);
if (res)
auto validateFn = [this](uint32_t readLedger) {
auto low = HistoryManager::firstLedgerInCheckpointContaining(
mCheckpoint, mApp.getConfig());
if (readLedger > mCheckpoint || readLedger < low)
{
auto readLedger = mTxResultEntry.ledgerSeq;
auto low = HistoryManager::firstLedgerInCheckpointContaining(
mCheckpoint, mApp.getConfig());
if (readLedger > mCheckpoint || readLedger < low)
{
throw std::runtime_error("Results outside of checkpoint range");
}
throw std::runtime_error("Results outside of checkpoint range");
}

if (readLedger <= mLastSeenLedger)
{
throw std::runtime_error("Malformed or duplicate results: "
"ledgers must be strictly increasing");
}
mLastSeenLedger = readLedger;
if (readLedger <= mLastSeenLedger)
{
throw std::runtime_error("Malformed or duplicate results: "
"ledgers must be strictly increasing");
}
return res;
mLastSeenLedger = readLedger;
};

do
auto foundEntry = getHistoryEntryForLedger<TransactionHistoryResultEntry>(
mResIn, mTxResultEntry, ledger, validateFn);

if (foundEntry)
{
if (mTxResultEntry.ledgerSeq < ledger)
{
CLOG_DEBUG(History, "Processed tx results for ledger {}",
mTxResultEntry.ledgerSeq);
}
else if (mTxResultEntry.ledgerSeq > ledger)
{
// No tx results in this ledger
break;
}
else
{
CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger);
trs.txResultSet = mTxResultEntry.txResultSet;
return trs;
}
} while (mResIn && readNextWithValidation());
CLOG_DEBUG(History, "Loaded tx result set for ledger {}", ledger);
trs.txResultSet = mTxResultEntry.txResultSet;
}

return trs;
}
Expand Down

0 comments on commit a299bf4

Please sign in to comment.