diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index 792039db17..59b5016844 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -77,35 +77,6 @@ Bucket::Bucket() { } -std::unique_ptr -Bucket::openStream() -{ - releaseAssertOrThrow(!mFilename.empty()); - auto streamPtr = std::make_unique(); - streamPtr->open(mFilename.string()); - return std::move(streamPtr); -} - -XDRInputFileStream& -Bucket::getIndexStream() -{ - if (!mIndexStream) - { - mIndexStream = openStream(); - } - return *mIndexStream; -} - -XDRInputFileStream& -Bucket::getEvictionStream() -{ - if (!mEvictionStream) - { - mEvictionStream = openStream(); - } - return *mEvictionStream; -} - Hash const& Bucket::getHash() const { @@ -156,90 +127,6 @@ void Bucket::freeIndex() { mIndex.reset(nullptr); - mIndexStream.reset(nullptr); -} - -std::optional -Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos, - size_t pageSize) -{ - ZoneScoped; - auto& stream = getIndexStream(); - stream.seek(pos); - - BucketEntry be; - if (pageSize == 0) - { - if (stream.readOne(be)) - { - return std::make_optional(be); - } - } - else if (stream.readPage(be, k, pageSize)) - { - return std::make_optional(be); - } - - // Mark entry miss for metrics - getIndex().markBloomMiss(); - return std::nullopt; -} - -std::optional -Bucket::getBucketEntry(LedgerKey const& k) -{ - ZoneScoped; - auto pos = getIndex().lookup(k); - if (pos.has_value()) - { - return getEntryAtOffset(k, pos.value(), getIndex().getPageSize()); - } - - return std::nullopt; -} - -// When searching for an entry, BucketList calls this function on every bucket. -// Since the input is sorted, we do a binary search for the first key in keys. -// If we find the entry, we remove the found key from keys so that later buckets -// do not load shadowed entries. If we don't find the entry, we do not remove it -// from keys so that it will be searched for again at a lower level. -void -Bucket::loadKeys(std::set& keys, - std::vector& result) -{ - ZoneScoped; - - auto currKeyIt = keys.begin(); - auto const& index = getIndex(); - auto indexIter = index.begin(); - while (currKeyIt != keys.end() && indexIter != index.end()) - { - auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt); - indexIter = newIndexIter; - if (offOp) - { - auto entryOp = - getEntryAtOffset(*currKeyIt, *offOp, getIndex().getPageSize()); - if (entryOp) - { - if (entryOp->type() != DEADENTRY) - { - result.push_back(entryOp->liveEntry()); - } - - currKeyIt = keys.erase(currKeyIt); - continue; - } - } - - ++currKeyIt; - } -} - -std::vector const& -Bucket::getPoolIDsByAsset(Asset const& asset) const -{ - return getIndex().getPoolIDsByAsset(asset); } #ifdef BUILD_TESTS @@ -787,12 +674,12 @@ mergeCasesWithEqualKeys(MergeCounters& mc, BucketInputIterator& oi, } bool -Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter, - uint32_t& bytesToScan, - uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq, - medida::Counter& entriesEvictedCounter, - medida::Counter& bytesScannedForEvictionCounter, - std::optional& metrics) +Bucket::scanForEvictionLegacySQL( + AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan, + uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq, + medida::Counter& entriesEvictedCounter, + medida::Counter& bytesScannedForEvictionCounter, + std::optional& stats) const { ZoneScoped; if (isEmpty() || @@ -809,7 +696,8 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter, return true; } - auto& stream = getEvictionStream(); + XDRInputFileStream stream{}; + stream.open(mFilename); stream.seek(iter.bucketFileOffset); BucketEntry be; @@ -844,10 +732,10 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter, if (shouldEvict()) { ZoneNamedN(evict, "evict entry", true); - if (metrics.has_value()) + if (stats.has_value()) { - ++metrics->numEntriesEvicted; - metrics->evictedEntriesAgeSum += + ++stats->numEntriesEvicted; + stats->evictedEntriesAgeSum += ledgerSeq - liveUntilLedger; } diff --git a/src/bucket/Bucket.h b/src/bucket/Bucket.h index 67b7b99843..17b5951921 100644 --- a/src/bucket/Bucket.h +++ b/src/bucket/Bucket.h @@ -38,7 +38,7 @@ namespace stellar class AbstractLedgerTxn; class Application; class BucketManager; -struct EvictionMetrics; +struct EvictionStatistics; class Bucket : public std::enable_shared_from_this, public NonMovableOrCopyable @@ -49,33 +49,9 @@ class Bucket : public std::enable_shared_from_this, std::unique_ptr mIndex{}; - // Lazily-constructed and retained for read path, one for BucketListDB reads - // and one for eviction scans - std::unique_ptr mIndexStream; - std::unique_ptr mEvictionStream; - // Returns index, throws if index not yet initialized BucketIndex const& getIndex() const; - // Returns (lazily-constructed) file stream for bucketDB search. Note - // this might be in some random position left over from a previous read -- - // must be seek()'ed before use. - XDRInputFileStream& getIndexStream(); - - // Returns (lazily-constructed) file stream for eviction scans. Unlike the - // indexStream, this should retain its position in-between calls. However, a - // node performing catchup or joining the network may need to begin evicting - // mid-bucket, so this stream should still be seeked to the proper position - // before reading. - XDRInputFileStream& getEvictionStream(); - - // Loads the bucket entry for LedgerKey k. Starts at file offset pos and - // reads until key is found or the end of the page. - std::optional - getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize); - - std::unique_ptr openStream(); - static std::string randomFileName(std::string const& tmpDir, std::string ext); @@ -109,18 +85,6 @@ class Bucket : public std::enable_shared_from_this, // Sets index, throws if index is already set void setIndex(std::unique_ptr&& index); - // Loads bucket entry for LedgerKey k. - std::optional getBucketEntry(LedgerKey const& k); - - // Loads LedgerEntry's for given keys. When a key is found, the - // entry is added to result and the key is removed from keys. - void loadKeys(std::set& keys, - std::vector& result); - - // Return all PoolIDs that contain the given asset on either side of the - // pool - std::vector const& getPoolIDsByAsset(Asset const& asset) const; - // At version 11, we added support for INITENTRY and METAENTRY. Before this // we were only supporting LIVEENTRY and DEADENTRY. static constexpr ProtocolVersion @@ -141,19 +105,6 @@ class Bucket : public std::enable_shared_from_this, static std::string randomBucketName(std::string const& tmpDir); static std::string randomBucketIndexName(std::string const& tmpDir); - // Returns false if eof reached or if Bucket protocol version < 20, true - // otherwise. Modifies iter as the bucket is scanned. Also modifies - // bytesToScan and remainingEntriesToEvict such that after this function - // returns: - // bytesToScan -= amount_bytes_scanned - // remainingEntriesToEvict -= entries_evicted - bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter, - uint32_t& bytesToScan, - uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq, - medida::Counter& entriesEvictedCounter, - medida::Counter& bytesScannedForEvictionCounter, - std::optional& metrics); - #ifdef BUILD_TESTS // "Applies" the bucket to the database. For each entry in the bucket, // if the entry is init or live, creates or updates the corresponding @@ -169,6 +120,18 @@ class Bucket : public std::enable_shared_from_this, #endif // BUILD_TESTS + // Returns false if eof reached, true otherwise. Modifies iter as the bucket + // is scanned. Also modifies bytesToScan and maxEntriesToEvict such that + // after this function returns: + // bytesToScan -= amount_bytes_scanned + // maxEntriesToEvict -= entries_evicted + bool scanForEvictionLegacySQL( + AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan, + uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq, + medida::Counter& entriesEvictedCounter, + medida::Counter& bytesScannedForEvictionCounter, + std::optional& stats) const; + // Create a fresh bucket from given vectors of init (created) and live // (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will // be sorted, hashed, and adopted in the provided BucketManager. @@ -201,5 +164,7 @@ class Bucket : public std::enable_shared_from_this, static uint32_t getBucketVersion(std::shared_ptr const& bucket); static uint32_t getBucketVersion(std::shared_ptr const& bucket); + + friend class BucketSnapshot; }; } diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index b58aa56983..a9a2bd857e 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -6,6 +6,7 @@ #include "bucket/Bucket.h" #include "bucket/BucketInputIterator.h" #include "bucket/BucketManager.h" +#include "bucket/BucketSnapshot.h" #include "bucket/LedgerCmp.h" #include "crypto/Hex.h" #include "crypto/Random.h" @@ -23,6 +24,7 @@ #include "util/types.h" #include "medida/counter.h" +#include "medida/metrics_registry.h" #include #include @@ -61,6 +63,7 @@ BucketLevel::getNext() void BucketLevel::setNext(FutureBucket const& fb) { + releaseAssert(threadIsMain()); mNextCurr = fb; } @@ -79,6 +82,7 @@ BucketLevel::getSnap() const void BucketLevel::setCurr(std::shared_ptr b) { + releaseAssert(threadIsMain()); mNextCurr.clear(); mCurr = b; } @@ -113,6 +117,7 @@ BucketList::shouldMergeWithEmptyCurr(uint32_t ledger, uint32_t level) void BucketLevel::setSnap(std::shared_ptr b) { + releaseAssert(threadIsMain()); mSnap = b; } @@ -376,182 +381,6 @@ BucketList::getHash() const return hsh.finish(); } -void -BucketList::loopAllBuckets(std::function)> f) const -{ - for (auto const& lev : mLevels) - { - std::array, 2> buckets = {lev.getCurr(), - lev.getSnap()}; - for (auto& b : buckets) - { - if (b->isEmpty()) - { - continue; - } - - if (f(b)) - { - return; - } - } - } -} - -std::shared_ptr -BucketList::getLedgerEntry(LedgerKey const& k) const -{ - ZoneScoped; - std::shared_ptr result{}; - - auto f = [&](std::shared_ptr b) { - auto be = b->getBucketEntry(k); - if (be.has_value()) - { - result = - be.value().type() == DEADENTRY - ? nullptr - : std::make_shared(be.value().liveEntry()); - return true; - } - else - { - return false; - } - }; - - loopAllBuckets(f); - return result; -} - -std::vector -BucketList::loadKeys(std::set const& inKeys) const -{ - ZoneScoped; - std::vector entries; - - // Make a copy of the key set, this loop is destructive - auto keys = inKeys; - auto f = [&](std::shared_ptr b) { - b->loadKeys(keys, entries); - return keys.empty(); - }; - - loopAllBuckets(f); - return entries; -} - -// This query has two steps: -// 1. For each bucket, determine what PoolIDs contain the target asset via the -// assetToPoolID index -// 2. Perform a bulk lookup for all possible trustline keys, that is, all -// trustlines with the given accountID and poolID from step 1 -std::vector -BucketList::loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, - Asset const& asset) const -{ - ZoneScoped; - LedgerKeySet trustlinesToLoad; - - auto trustLineLoop = [&](std::shared_ptr b) { - for (auto const& poolID : b->getPoolIDsByAsset(asset)) - { - LedgerKey trustlineKey(TRUSTLINE); - trustlineKey.trustLine().accountID = accountID; - trustlineKey.trustLine().asset.type(ASSET_TYPE_POOL_SHARE); - trustlineKey.trustLine().asset.liquidityPoolID() = poolID; - trustlinesToLoad.emplace(trustlineKey); - } - - return false; // continue - }; - - loopAllBuckets(trustLineLoop); - return loadKeys(trustlinesToLoad); -} - -std::vector -BucketList::loadInflationWinners(size_t maxWinners, int64_t minBalance) const -{ - UnorderedMap voteCount; - UnorderedSet seen; - - auto countVotesInBucket = [&](std::shared_ptr b) { - for (BucketInputIterator in(b); in; ++in) - { - BucketEntry const& be = *in; - if (be.type() == DEADENTRY) - { - if (be.deadEntry().type() == ACCOUNT) - { - seen.insert(be.deadEntry().account().accountID); - } - continue; - } - - // Account are ordered first, so once we see a non-account entry, no - // other accounts are left in the bucket - LedgerEntry const& le = be.liveEntry(); - if (le.data.type() != ACCOUNT) - { - break; - } - - // Don't double count AccountEntry's seen in earlier levels - AccountEntry const& ae = le.data.account(); - AccountID const& id = ae.accountID; - if (!seen.insert(id).second) - { - continue; - } - - if (ae.inflationDest && ae.balance >= 1000000000) - { - voteCount[*ae.inflationDest] += ae.balance; - } - } - - return false; - }; - - loopAllBuckets(countVotesInBucket); - std::vector winners; - - // Check if we need to sort the voteCount by number of votes - if (voteCount.size() > maxWinners) - { - - // Sort Inflation winners by vote count in descending order - std::map::const_iterator, - std::greater> - voteCountSortedByCount; - for (auto iter = voteCount.cbegin(); iter != voteCount.cend(); ++iter) - { - voteCountSortedByCount[iter->second] = iter; - } - - // Insert first maxWinners entries that are larger thanminBalance - for (auto iter = voteCountSortedByCount.cbegin(); - winners.size() < maxWinners && iter->first >= minBalance; ++iter) - { - // push back {AccountID, voteCount} - winners.push_back({iter->second->first, iter->first}); - } - } - else - { - for (auto const& [id, count] : voteCount) - { - if (count >= minBalance) - { - winners.push_back({id, count}); - } - } - } - - return winners; -} - // levelShouldSpill is the set of boundaries at which each level should // spill, it's not-entirely obvious which numbers these are by inspection, // so we list the first 3 values it's true on each level here for reference: @@ -811,9 +640,9 @@ BucketList::addBatch(Application& app, uint32_t currLedger, } void -BucketList::scanForEviction(Application& app, AbstractLedgerTxn& ltx, - uint32_t ledgerSeq, - BucketListEvictionCounters& counters) +BucketList::scanForEvictionLegacySQL(Application& app, AbstractLedgerTxn& ltx, + uint32_t ledgerSeq, + EvictionCounters& counters) { auto getBucketFromIter = [&levels = mLevels](EvictionIterator const& iter) { auto& level = levels.at(iter.bucketListLevel); @@ -890,10 +719,10 @@ BucketList::scanForEviction(Application& app, AbstractLedgerTxn& ltx, counters.incompleteBucketScan.inc(); } - while (!b->scanForEviction( + while (!b->scanForEvictionLegacySQL( ltx, evictionIter, scanSize, maxEntriesToEvict, ledgerSeq, counters.entriesEvicted, counters.bytesScannedForEviction, - mEvictionMetrics)) + mEvictionStatistics)) { // If we reached eof in curr bucket, start scanning snap. // Last level has no snap so cycle back to the initial level. @@ -918,23 +747,24 @@ BucketList::scanForEviction(Application& app, AbstractLedgerTxn& ltx, // If eviction metrics are not null, we have accounted for a // complete cycle and should log the metrics - if (mEvictionMetrics) + if (mEvictionStatistics) { counters.evictionCyclePeriod.set_count( ledgerSeq - - mEvictionMetrics->evictionCycleStartLedger); + mEvictionStatistics->evictionCycleStartLedger); auto averageAge = - mEvictionMetrics->numEntriesEvicted == 0 + mEvictionStatistics->numEntriesEvicted == 0 ? 0 - : mEvictionMetrics->evictedEntriesAgeSum / - mEvictionMetrics->numEntriesEvicted; + : mEvictionStatistics->evictedEntriesAgeSum / + mEvictionStatistics->numEntriesEvicted; counters.averageEvictedEntryAge.set_count(averageAge); } // Reset metrics at beginning of new eviction cycle - mEvictionMetrics = std::make_optional(); - mEvictionMetrics->evictionCycleStartLedger = ledgerSeq; + mEvictionStatistics = + std::make_optional(); + mEvictionStatistics->evictionCycleStartLedger = ledgerSeq; } } @@ -1024,6 +854,20 @@ BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion, } } +EvictionCounters::EvictionCounters(Application& app) + : entriesEvicted(app.getMetrics().NewCounter( + {"state-archival", "eviction", "entries-evicted"})) + , bytesScannedForEviction(app.getMetrics().NewCounter( + {"state-archival", "eviction", "bytes-scanned"})) + , incompleteBucketScan(app.getMetrics().NewCounter( + {"state-archival", "eviction", "incomplete-scan"})) + , evictionCyclePeriod( + app.getMetrics().NewCounter({"state-archival", "eviction", "period"})) + , averageEvictedEntryAge( + app.getMetrics().NewCounter({"state-archival", "eviction", "age"})) +{ +} + BucketListDepth BucketList::kNumLevels = 11; BucketList::BucketList() diff --git a/src/bucket/BucketList.h b/src/bucket/BucketList.h index 00aa0536ad..45b4fb25fe 100644 --- a/src/bucket/BucketList.h +++ b/src/bucket/BucketList.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "bucket/Bucket.h" #include "bucket/FutureBucket.h" #include "bucket/LedgerCmp.h" #include "overlay/StellarXDR.h" @@ -14,7 +15,6 @@ namespace medida { -class Meter; class Counter; } @@ -352,7 +352,6 @@ class AbstractLedgerTxn; class Application; class Bucket; class Config; -struct BucketListEvictionCounters; struct InflationWinner; namespace testutil @@ -403,7 +402,7 @@ class BucketListDepth friend class testutil::BucketListDepthModifier; }; -struct EvictionMetrics +struct EvictionStatistics { // Evicted entry "age" is the delta between its liveUntilLedger and the // ledger when the entry is actually evicted @@ -412,6 +411,17 @@ struct EvictionMetrics uint32_t evictionCycleStartLedger{}; }; +struct EvictionCounters +{ + medida::Counter& entriesEvicted; + medida::Counter& bytesScannedForEviction; + medida::Counter& incompleteBucketScan; + medida::Counter& evictionCyclePeriod; + medida::Counter& averageEvictedEntryAge; + + EvictionCounters(Application& app); +}; + class BucketList { std::vector mLevels; @@ -419,12 +429,7 @@ class BucketList // To avoid noisy data, only count metrics that encompass a complete // eviction cycle. If a node joins the network mid cycle, metrics will be // nullopt and be initialized at the start of the next cycle. - std::optional mEvictionMetrics; - - // Loops through all buckets, starting with curr at level 0, then snap at - // level 0, etc. Calls f on each bucket. Exits early if function - // returns true - void loopAllBuckets(std::function)> f) const; + std::optional mEvictionStatistics; public: // Number of bucket levels in the bucketlist. Every bucketlist in the system @@ -481,17 +486,9 @@ class BucketList // of the concatenation of the hashes of the `curr` and `snap` buckets. Hash getHash() const; - std::shared_ptr getLedgerEntry(LedgerKey const& k) const; - - std::vector - loadKeys(std::set const& inKeys) const; - - std::vector - loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, - Asset const& asset) const; - - std::vector loadInflationWinners(size_t maxWinners, - int64_t minBalance) const; + void scanForEvictionLegacySQL(Application& app, AbstractLedgerTxn& ltx, + uint32_t ledgerSeq, + EvictionCounters& counters); // Restart any merges that might be running on background worker threads, // merging buckets between levels. This needs to be called after forcing a @@ -537,9 +534,5 @@ class BucketList std::vector const& initEntries, std::vector const& liveEntries, std::vector const& deadEntries); - - void scanForEviction(Application& app, AbstractLedgerTxn& ltx, - uint32_t ledgerSeq, - BucketListEvictionCounters& counters); }; } diff --git a/src/bucket/BucketListSnapshot.cpp b/src/bucket/BucketListSnapshot.cpp new file mode 100644 index 0000000000..21036b0f55 --- /dev/null +++ b/src/bucket/BucketListSnapshot.cpp @@ -0,0 +1,292 @@ +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketListSnapshot.h" +#include "bucket/BucketInputIterator.h" +#include "crypto/SecretKey.h" +#include "ledger/LedgerTxn.h" + +#include "medida/meter.h" +#include "medida/metrics_registry.h" + +namespace stellar +{ + +BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq) + : mLedgerSeq(ledgerSeq) +{ + releaseAssert(threadIsMain()); + + for (uint32_t i = 0; i < BucketList::kNumLevels; ++i) + { + auto const& level = bl.getLevel(i); + mLevels.emplace_back(BucketLevelSnapshot(level)); + } +} + +BucketListSnapshot::BucketListSnapshot(BucketListSnapshot const& snapshot) + : mLevels(snapshot.mLevels), mLedgerSeq(snapshot.mLedgerSeq) +{ +} + +std::vector const& +BucketListSnapshot::getLevels() const +{ + return mLevels; +} + +uint32_t +BucketListSnapshot::getLedgerSeq() const +{ + return mLedgerSeq; +} + +void +SearchableBucketListSnapshot::loopAllBuckets( + std::function f) const +{ + releaseAssert(mSnapshot); + + for (auto const& lev : mSnapshot->getLevels()) + { + // Return true if we should exit loop early + auto processBucket = [f](BucketSnapshot const& b) { + if (b.isEmpty()) + { + return false; + } + + return f(b); + }; + + if (processBucket(lev.curr) || processBucket(lev.snap)) + { + return; + } + } +} + +std::shared_ptr +SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) +{ + ZoneScoped; + mSnapshotManager.maybeUpdateSnapshot(mSnapshot); + + if (threadIsMain()) + { + auto timer = mSnapshotManager.getPointLoadTimer(k.type()).TimeScope(); + return getLedgerEntryInternal(k); + } + else + { + return getLedgerEntryInternal(k); + } +} + +std::shared_ptr +SearchableBucketListSnapshot::getLedgerEntryInternal(LedgerKey const& k) +{ + std::shared_ptr result{}; + + auto f = [&](BucketSnapshot const& b) { + auto be = b.getBucketEntry(k); + if (be.has_value()) + { + result = + be.value().type() == DEADENTRY + ? nullptr + : std::make_shared(be.value().liveEntry()); + return true; + } + else + { + return false; + } + }; + + loopAllBuckets(f); + return result; +} + +std::vector +SearchableBucketListSnapshot::loadKeysInternal( + std::set const& inKeys) +{ + std::vector entries; + + // Make a copy of the key set, this loop is destructive + auto keys = inKeys; + auto f = [&](BucketSnapshot const& b) { + b.loadKeys(keys, entries); + return keys.empty(); + }; + + loopAllBuckets(f); + return entries; +} + +std::vector +SearchableBucketListSnapshot::loadKeys( + std::set const& inKeys) +{ + ZoneScoped; + mSnapshotManager.maybeUpdateSnapshot(mSnapshot); + + if (threadIsMain()) + { + auto timer = + mSnapshotManager.recordBulkLoadMetrics("prefetch", inKeys.size()) + .TimeScope(); + return loadKeysInternal(inKeys); + } + else + { + return loadKeysInternal(inKeys); + } +} + +// This query has two steps: +// 1. For each bucket, determine what PoolIDs contain the target asset via the +// assetToPoolID index +// 2. Perform a bulk lookup for all possible trustline keys, that is, all +// trustlines with the given accountID and poolID from step 1 +std::vector +SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset( + AccountID const& accountID, Asset const& asset) +{ + ZoneScoped; + + // This query should only be called during TX apply + releaseAssert(threadIsMain()); + mSnapshotManager.maybeUpdateSnapshot(mSnapshot); + + LedgerKeySet trustlinesToLoad; + + auto trustLineLoop = [&](BucketSnapshot const& b) { + for (auto const& poolID : b.getPoolIDsByAsset(asset)) + { + LedgerKey trustlineKey(TRUSTLINE); + trustlineKey.trustLine().accountID = accountID; + trustlineKey.trustLine().asset.type(ASSET_TYPE_POOL_SHARE); + trustlineKey.trustLine().asset.liquidityPoolID() = poolID; + trustlinesToLoad.emplace(trustlineKey); + } + + return false; // continue + }; + + loopAllBuckets(trustLineLoop); + + auto timer = mSnapshotManager + .recordBulkLoadMetrics("poolshareTrustlines", + trustlinesToLoad.size()) + .TimeScope(); + return loadKeysInternal(trustlinesToLoad); +} + +std::vector +SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners, + int64_t minBalance) +{ + ZoneScoped; + mSnapshotManager.maybeUpdateSnapshot(mSnapshot); + + // This is a legacy query, should only be called by main thread during + // catchup + releaseAssert(threadIsMain()); + auto timer = mSnapshotManager.recordBulkLoadMetrics("inflationWinners", 0) + .TimeScope(); + + UnorderedMap voteCount; + UnorderedSet seen; + + auto countVotesInBucket = [&](BucketSnapshot const& b) { + for (BucketInputIterator in(b.getRawBucket()); in; ++in) + { + BucketEntry const& be = *in; + if (be.type() == DEADENTRY) + { + if (be.deadEntry().type() == ACCOUNT) + { + seen.insert(be.deadEntry().account().accountID); + } + continue; + } + + // Account are ordered first, so once we see a non-account entry, no + // other accounts are left in the bucket + LedgerEntry const& le = be.liveEntry(); + if (le.data.type() != ACCOUNT) + { + break; + } + + // Don't double count AccountEntry's seen in earlier levels + AccountEntry const& ae = le.data.account(); + AccountID const& id = ae.accountID; + if (!seen.insert(id).second) + { + continue; + } + + if (ae.inflationDest && ae.balance >= 1000000000) + { + voteCount[*ae.inflationDest] += ae.balance; + } + } + + return false; + }; + + loopAllBuckets(countVotesInBucket); + std::vector winners; + + // Check if we need to sort the voteCount by number of votes + if (voteCount.size() > maxWinners) + { + + // Sort Inflation winners by vote count in descending order + std::map::const_iterator, + std::greater> + voteCountSortedByCount; + for (auto iter = voteCount.cbegin(); iter != voteCount.cend(); ++iter) + { + voteCountSortedByCount[iter->second] = iter; + } + + // Insert first maxWinners entries that are larger thanminBalance + for (auto iter = voteCountSortedByCount.cbegin(); + winners.size() < maxWinners && iter->first >= minBalance; ++iter) + { + // push back {AccountID, voteCount} + winners.push_back({iter->second->first, iter->first}); + } + } + else + { + for (auto const& [id, count] : voteCount) + { + if (count >= minBalance) + { + winners.push_back({id, count}); + } + } + } + + return winners; +} + +BucketLevelSnapshot::BucketLevelSnapshot(BucketLevel const& level) + : curr(level.getCurr()), snap(level.getSnap()) +{ +} + +SearchableBucketListSnapshot::SearchableBucketListSnapshot( + BucketSnapshotManager const& snapshotManager) + : mSnapshotManager(snapshotManager) +{ + // Initialize snapshot from SnapshotManager + mSnapshotManager.maybeUpdateSnapshot(mSnapshot); +} +} \ No newline at end of file diff --git a/src/bucket/BucketListSnapshot.h b/src/bucket/BucketListSnapshot.h new file mode 100644 index 0000000000..7d9f05dfa9 --- /dev/null +++ b/src/bucket/BucketListSnapshot.h @@ -0,0 +1,91 @@ +#pragma once + +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketList.h" +#include "bucket/BucketManagerImpl.h" +#include "bucket/BucketSnapshot.h" +#include "bucket/BucketSnapshotManager.h" + +namespace medida +{ +class Timer; +} + +namespace stellar +{ + +struct BucketLevelSnapshot +{ + BucketSnapshot curr; + BucketSnapshot snap; + + BucketLevelSnapshot(BucketLevel const& level); +}; + +class BucketListSnapshot : public NonMovable +{ + private: + std::vector mLevels; + + // ledgerSeq that this BucketList snapshot is based off of + uint32_t mLedgerSeq; + + public: + BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq); + + // Only allow copies via constructor + BucketListSnapshot(BucketListSnapshot const& snapshot); + BucketListSnapshot& operator=(BucketListSnapshot const&) = delete; + + std::vector const& getLevels() const; + uint32_t getLedgerSeq() const; +}; + +// A lightweight wrapper around BucketListSnapshot for thread safe BucketListDB +// lookups. +// +// Any thread that needs to perform BucketList lookups should retrieve +// a single SearchableBucketListSnapshot instance from +// BucketListSnapshotManager. On each lookup, the SearchableBucketListSnapshot +// instance will check that the current snapshot is up to date via the +// BucketListSnapshotManager and will be refreshed accordingly. Callers can +// assume SearchableBucketListSnapshot is always up to date. +class SearchableBucketListSnapshot : public NonMovableOrCopyable +{ + BucketSnapshotManager const& mSnapshotManager; + + // Snapshot managed by SnapshotManager + std::unique_ptr mSnapshot{}; + + // Loops through all buckets, starting with curr at level 0, then snap at + // level 0, etc. Calls f on each bucket. Exits early if function + // returns true + void loopAllBuckets(std::function f) const; + + std::vector + loadKeysInternal(std::set const& inKeys); + + std::shared_ptr getLedgerEntryInternal(LedgerKey const& k); + + SearchableBucketListSnapshot(BucketSnapshotManager const& snapshotManager); + + friend std::unique_ptr + BucketSnapshotManager::getSearchableBucketListSnapshot() const; + + public: + std::vector + loadKeys(std::set const& inKeys); + + std::vector + loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, + Asset const& asset); + + std::vector loadInflationWinners(size_t maxWinners, + int64_t minBalance); + + std::shared_ptr getLedgerEntry(LedgerKey const& k); +}; +} \ No newline at end of file diff --git a/src/bucket/BucketManager.h b/src/bucket/BucketManager.h index 6028eb924b..8000fc38b1 100644 --- a/src/bucket/BucketManager.h +++ b/src/bucket/BucketManager.h @@ -27,7 +27,9 @@ class AbstractLedgerTxn; class Application; class BasicWork; class BucketList; +class BucketSnapshotManager; class Config; +class SearchableBucketListSnapshot; class TmpDirManager; struct HistoryArchiveState; struct InflationWinner; @@ -78,17 +80,6 @@ struct MergeCounters bool operator==(MergeCounters const& other) const; }; -struct BucketListEvictionCounters -{ - medida::Counter& entriesEvicted; - medida::Counter& bytesScannedForEviction; - medida::Counter& incompleteBucketScan; - medida::Counter& evictionCyclePeriod; - medida::Counter& averageEvictedEntryAge; - - BucketListEvictionCounters(Application& app); -}; - /** * BucketManager is responsible for maintaining a collection of Buckets of * ledger entries (each sorted, de-duplicated and identified by hash) and, @@ -126,6 +117,7 @@ class BucketManager : NonMovableOrCopyable virtual TmpDirManager& getTmpDirManager() = 0; virtual std::string const& getBucketDir() const = 0; virtual BucketList& getBucketList() = 0; + virtual BucketSnapshotManager& getBucketSnapshotManager() const = 0; virtual bool renameBucketDirFile(std::filesystem::path const& src, std::filesystem::path const& dst) = 0; @@ -220,24 +212,8 @@ class BucketManager : NonMovableOrCopyable // Scans BucketList for non-live entries to evict starting at the entry // pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries // have been evicted or maxEvictionScanSize bytes have been scanned. - virtual void scanForEviction(AbstractLedgerTxn& ltx, - uint32_t ledgerSeq) = 0; - - // Look up a ledger entry from the BucketList. Returns nullopt if the LE is - // dead / nonexistent. - virtual std::shared_ptr - getLedgerEntry(LedgerKey const& k) const = 0; - - // Loads LedgerEntry for all keys. - virtual std::vector - loadKeys(std::set const& keys) const = 0; - - virtual std::vector - loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, - Asset const& asset) const = 0; - - virtual std::vector - loadInflationWinners(size_t maxWinners, int64_t minBalance) const = 0; + virtual void scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, + uint32_t ledgerSeq) = 0; virtual medida::Meter& getBloomMissMeter() const = 0; virtual medida::Meter& getBloomLookupMeter() const = 0; diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index de116cefa0..d5e62d3531 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -6,7 +6,9 @@ #include "bucket/Bucket.h" #include "bucket/BucketInputIterator.h" #include "bucket/BucketList.h" +#include "bucket/BucketListSnapshot.h" #include "bucket/BucketOutputIterator.h" +#include "bucket/BucketSnapshotManager.h" #include "crypto/Hex.h" #include "history/HistoryManager.h" #include "historywork/VerifyBucketWork.h" @@ -87,6 +89,13 @@ BucketManagerImpl::initialize() if (mApp.getConfig().MODE_ENABLES_BUCKETLIST) { mBucketList = std::make_unique(); + + if (mApp.getConfig().isUsingBucketListDB()) + { + mSnapshotManager = std::make_unique( + mApp.getMetrics(), + std::make_unique(*mBucketList, 0)); + } } } @@ -104,23 +113,10 @@ BucketManagerImpl::getTmpDirManager() return *mTmpDirManager; } -BucketListEvictionCounters::BucketListEvictionCounters(Application& app) - : entriesEvicted(app.getMetrics().NewCounter( - {"state-archival", "eviction", "entries-evicted"})) - , bytesScannedForEviction(app.getMetrics().NewCounter( - {"state-archival", "eviction", "bytes-scanned"})) - , incompleteBucketScan(app.getMetrics().NewCounter( - {"state-archival", "eviction", "incomplete-scan"})) - , evictionCyclePeriod( - app.getMetrics().NewCounter({"state-archival", "eviction", "period"})) - , averageEvictedEntryAge( - app.getMetrics().NewCounter({"state-archival", "eviction", "age"})) -{ -} - BucketManagerImpl::BucketManagerImpl(Application& app) : mApp(app) , mBucketList(nullptr) + , mSnapshotManager(nullptr) , mTmpDirManager(nullptr) , mWorkDir(nullptr) , mLockedBucketDir(nullptr) @@ -130,8 +126,6 @@ BucketManagerImpl::BucketManagerImpl(Application& app) , mBucketSnapMerge(app.getMetrics().NewTimer({"bucket", "snap", "merge"})) , mSharedBucketsSize( app.getMetrics().NewCounter({"bucket", "memory", "shared"})) - , mBucketListDBBulkLoadMeter(app.getMetrics().NewMeter( - {"bucketlistDB", "query", "loads"}, "query")) , mBucketListDBBloomMisses(app.getMetrics().NewMeter( {"bucketlistDB", "bloom", "misses"}, "bloom")) , mBucketListDBBloomLookups(app.getMetrics().NewMeter( @@ -275,6 +269,14 @@ BucketManagerImpl::getBucketList() return *mBucketList; } +BucketSnapshotManager& +BucketManagerImpl::getBucketSnapshotManager() const +{ + releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB()); + releaseAssert(mSnapshotManager); + return *mSnapshotManager; +} + medida::Timer& BucketManagerImpl::getMergeTimer() { @@ -840,6 +842,12 @@ BucketManagerImpl::addBatch(Application& app, uint32_t currLedger, mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries, liveEntries, deadEntries); mBucketListSizeCounter.set_count(mBucketList->getSize()); + + if (app.getConfig().isUsingBucketListDB()) + { + mSnapshotManager->updateCurrentSnapshot( + std::make_unique(*mBucketList, currLedger)); + } } #ifdef BUILD_TESTS @@ -907,92 +915,18 @@ BucketManagerImpl::maybeSetIndex(std::shared_ptr b, } void -BucketManagerImpl::scanForEviction(AbstractLedgerTxn& ltx, uint32_t ledgerSeq) +BucketManagerImpl::scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, + uint32_t ledgerSeq) { ZoneScoped; if (protocolVersionStartsFrom(ltx.getHeader().ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { - mBucketList->scanForEviction(mApp, ltx, ledgerSeq, - mBucketListEvictionCounters); + mBucketList->scanForEvictionLegacySQL(mApp, ltx, ledgerSeq, + mBucketListEvictionCounters); } } -medida::Timer& -BucketManagerImpl::recordBulkLoadMetrics(std::string const& label, - size_t numEntries) const -{ - if (numEntries != 0) - { - mBucketListDBBulkLoadMeter.Mark(numEntries); - } - - auto iter = mBucketListDBBulkTimers.find(label); - if (iter == mBucketListDBBulkTimers.end()) - { - auto& metric = - mApp.getMetrics().NewTimer({"bucketlistDB", "bulk", label}); - iter = mBucketListDBBulkTimers.emplace(label, metric).first; - } - - return iter->second; -} - -medida::Timer& -BucketManagerImpl::getPointLoadTimer(LedgerEntryType t) const -{ - auto iter = mBucketListDBPointTimers.find(t); - if (iter == mBucketListDBPointTimers.end()) - { - auto const& label = xdr::xdr_traits::enum_name(t); - auto& metric = - mApp.getMetrics().NewTimer({"bucketlistDB", "point", label}); - iter = mBucketListDBPointTimers.emplace(t, metric).first; - } - - return iter->second; -} - -std::shared_ptr -BucketManagerImpl::getLedgerEntry(LedgerKey const& k) const -{ - releaseAssertOrThrow(getConfig().isUsingBucketListDB()); - auto timer = getPointLoadTimer(k.type()).TimeScope(); - return mBucketList->getLedgerEntry(k); -} - -std::vector -BucketManagerImpl::loadKeys( - std::set const& keys) const -{ - releaseAssertOrThrow(getConfig().isUsingBucketListDB()); - auto timer = recordBulkLoadMetrics("prefetch", keys.size()).TimeScope(); - return mBucketList->loadKeys(keys); -} - -std::vector -BucketManagerImpl::loadPoolShareTrustLinesByAccountAndAsset( - AccountID const& accountID, Asset const& asset) const -{ - releaseAssertOrThrow(getConfig().isUsingBucketListDB()); - // This query needs to do a linear scan of certain regions of the - // BucketList, so the number of entries loaded is meaningless - auto timer = recordBulkLoadMetrics("poolshareTrustlines", 0).TimeScope(); - return mBucketList->loadPoolShareTrustLinesByAccountAndAsset(accountID, - asset); -} - -std::vector -BucketManagerImpl::loadInflationWinners(size_t maxWinners, - int64_t minBalance) const -{ - releaseAssertOrThrow(getConfig().isUsingBucketListDB()); - // This query needs to do a linear scan of certain regions of the - // BucketList, so the number of entries loaded is meaningless - auto timer = recordBulkLoadMetrics("inflationWinners", 0).TimeScope(); - return mBucketList->loadInflationWinners(maxWinners, minBalance); -} - medida::Meter& BucketManagerImpl::getBloomMissMeter() const { @@ -1059,8 +993,8 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, auto snap = getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap)); if (!(curr && snap)) { - throw std::runtime_error( - "Missing bucket files while assuming saved BucketList state"); + throw std::runtime_error("Missing bucket files while assuming " + "saved BucketList state"); } auto const& nextFuture = has.currentBuckets.at(i).next; @@ -1076,8 +1010,8 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, } } - // Buckets on the BucketList should always be indexed when BucketListDB - // enabled + // Buckets on the BucketList should always be indexed when + // BucketListDB enabled if (mApp.getConfig().isUsingBucketListDB()) { releaseAssert(curr->isEmpty() || curr->isIndexed()); @@ -1097,6 +1031,13 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, { mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger); } + + if (mApp.getConfig().isUsingBucketListDB()) + { + mSnapshotManager->updateCurrentSnapshot( + std::make_unique(*mBucketList, + has.currentLedger)); + } cleanupStaleFiles(); } diff --git a/src/bucket/BucketManagerImpl.h b/src/bucket/BucketManagerImpl.h index c68a17ea43..60ba0dcc76 100644 --- a/src/bucket/BucketManagerImpl.h +++ b/src/bucket/BucketManagerImpl.h @@ -30,6 +30,7 @@ class AbstractLedgerTxn; class Application; class Bucket; class BucketList; +class BucketSnapshotManager; struct HistoryArchiveState; class BucketManagerImpl : public BucketManager @@ -38,23 +39,24 @@ class BucketManagerImpl : public BucketManager Application& mApp; std::unique_ptr mBucketList; + std::unique_ptr mSnapshotManager; std::unique_ptr mTmpDirManager; std::unique_ptr mWorkDir; std::map> mSharedBuckets; + + // Lock for managing raw Bucket files or the bucket directory. This lock is + // only required for file access, but is not required for logical changes to + // the BucketList (i.e. addBatch). mutable std::recursive_mutex mBucketMutex; std::unique_ptr mLockedBucketDir; medida::Meter& mBucketObjectInsertBatch; medida::Timer& mBucketAddBatch; medida::Timer& mBucketSnapMerge; medida::Counter& mSharedBucketsSize; - medida::Meter& mBucketListDBBulkLoadMeter; medida::Meter& mBucketListDBBloomMisses; medida::Meter& mBucketListDBBloomLookups; medida::Counter& mBucketListSizeCounter; - BucketListEvictionCounters mBucketListEvictionCounters; - mutable UnorderedMap - mBucketListDBPointTimers{}; - mutable UnorderedMap mBucketListDBBulkTimers{}; + EvictionCounters mBucketListEvictionCounters; MergeCounters mMergeCounters; bool const mDeleteEntireBucketDirInDtor; @@ -104,6 +106,7 @@ class BucketManagerImpl : public BucketManager std::string const& getTmpDir() override; std::string const& getBucketDir() const override; BucketList& getBucketList() override; + BucketSnapshotManager& getBucketSnapshotManager() const; medida::Timer& getMergeTimer() override; MergeCounters readMergeCounters() override; void incrMergeCounters(MergeCounters const&) override; @@ -135,17 +138,9 @@ class BucketManagerImpl : public BucketManager void snapshotLedger(LedgerHeader& currentHeader) override; void maybeSetIndex(std::shared_ptr b, std::unique_ptr&& index) override; - void scanForEviction(AbstractLedgerTxn& ltx, uint32_t ledgerSeq) override; - - std::shared_ptr - getLedgerEntry(LedgerKey const& k) const override; - std::vector - loadKeys(std::set const& keys) const override; - std::vector - loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, - Asset const& asset) const override; - std::vector - loadInflationWinners(size_t maxWinners, int64_t minBalance) const override; + void scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, + uint32_t ledgerSeq) override; + medida::Meter& getBloomMissMeter() const override; medida::Meter& getBloomLookupMeter() const override; diff --git a/src/bucket/BucketSnapshot.cpp b/src/bucket/BucketSnapshot.cpp new file mode 100644 index 0000000000..a9b8d2fb9c --- /dev/null +++ b/src/bucket/BucketSnapshot.cpp @@ -0,0 +1,155 @@ +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketSnapshot.h" +#include "bucket/Bucket.h" +#include "bucket/BucketListSnapshot.h" +#include "ledger/LedgerTxn.h" +#include "ledger/LedgerTypeUtils.h" + +#include "medida/counter.h" + +namespace stellar +{ +BucketSnapshot::BucketSnapshot(std::shared_ptr const b) + : mBucket(b) +{ + releaseAssert(mBucket); +} + +BucketSnapshot::BucketSnapshot(BucketSnapshot const& b) + : mBucket(b.mBucket), mStream(nullptr) +{ + releaseAssert(mBucket); +} + +bool +BucketSnapshot::isEmpty() const +{ + releaseAssert(mBucket); + return mBucket->isEmpty(); +} + +std::optional +BucketSnapshot::getEntryAtOffset(LedgerKey const& k, std::streamoff pos, + size_t pageSize) const +{ + ZoneScoped; + if (isEmpty()) + { + return std::nullopt; + } + + auto& stream = getStream(); + stream.seek(pos); + + BucketEntry be; + if (pageSize == 0) + { + if (stream.readOne(be)) + { + return std::make_optional(be); + } + } + else if (stream.readPage(be, k, pageSize)) + { + return std::make_optional(be); + } + + // Mark entry miss for metrics + mBucket->getIndex().markBloomMiss(); + return std::nullopt; +} + +std::optional +BucketSnapshot::getBucketEntry(LedgerKey const& k) const +{ + ZoneScoped; + if (isEmpty()) + { + return std::nullopt; + } + + auto pos = mBucket->getIndex().lookup(k); + if (pos.has_value()) + { + return getEntryAtOffset(k, pos.value(), + mBucket->getIndex().getPageSize()); + } + + return std::nullopt; +} + +// When searching for an entry, BucketList calls this function on every bucket. +// Since the input is sorted, we do a binary search for the first key in keys. +// If we find the entry, we remove the found key from keys so that later buckets +// do not load shadowed entries. If we don't find the entry, we do not remove it +// from keys so that it will be searched for again at a lower level. +void +BucketSnapshot::loadKeys(std::set& keys, + std::vector& result) const +{ + ZoneScoped; + if (isEmpty()) + { + return; + } + + auto currKeyIt = keys.begin(); + auto const& index = mBucket->getIndex(); + auto indexIter = index.begin(); + while (currKeyIt != keys.end() && indexIter != index.end()) + { + auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt); + indexIter = newIndexIter; + if (offOp) + { + auto entryOp = getEntryAtOffset(*currKeyIt, *offOp, + mBucket->getIndex().getPageSize()); + if (entryOp) + { + if (entryOp->type() != DEADENTRY) + { + result.push_back(entryOp->liveEntry()); + } + + currKeyIt = keys.erase(currKeyIt); + continue; + } + } + + ++currKeyIt; + } +} + +std::vector const& +BucketSnapshot::getPoolIDsByAsset(Asset const& asset) const +{ + static std::vector const emptyVec = {}; + if (isEmpty()) + { + return emptyVec; + } + + return mBucket->getIndex().getPoolIDsByAsset(asset); +} + +XDRInputFileStream& +BucketSnapshot::getStream() const +{ + releaseAssertOrThrow(!isEmpty()); + if (!mStream) + { + mStream = std::make_unique(); + mStream->open(mBucket->getFilename().string()); + } + return *mStream; +} + +std::shared_ptr +BucketSnapshot::getRawBucket() const +{ + return mBucket; +} +} \ No newline at end of file diff --git a/src/bucket/BucketSnapshot.h b/src/bucket/BucketSnapshot.h new file mode 100644 index 0000000000..0b54a2ace6 --- /dev/null +++ b/src/bucket/BucketSnapshot.h @@ -0,0 +1,63 @@ +#pragma once + +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "util/NonCopyable.h" +#include "util/UnorderedMap.h" +#include "util/UnorderedSet.h" +#include "util/types.h" + +#include + +namespace stellar +{ + +class Bucket; +class XDRInputFileStream; + +// A lightweight wrapper around Bucket for thread safe BucketListDB lookups +class BucketSnapshot : public NonMovable +{ + std::shared_ptr const mBucket; + + // Lazily-constructed and retained for read path. + mutable std::unique_ptr mStream{}; + + // Returns (lazily-constructed) file stream for bucket file. Note + // this might be in some random position left over from a previous read -- + // must be seek()'ed before use. + XDRInputFileStream& getStream() const; + + // Loads the bucket entry for LedgerKey k. Starts at file offset pos and + // reads until key is found or the end of the page. + std::optional getEntryAtOffset(LedgerKey const& k, + std::streamoff pos, + size_t pageSize) const; + + BucketSnapshot(std::shared_ptr const b); + + // Only allow copy constructor, is threadsafe + BucketSnapshot(BucketSnapshot const& b); + BucketSnapshot& operator=(BucketSnapshot const&) = delete; + + public: + bool isEmpty() const; + std::shared_ptr getRawBucket() const; + + // Loads bucket entry for LedgerKey k. + std::optional getBucketEntry(LedgerKey const& k) const; + + // Loads LedgerEntry's for given keys. When a key is found, the + // entry is added to result and the key is removed from keys. + void loadKeys(std::set& keys, + std::vector& result) const; + + // Return all PoolIDs that contain the given asset on either side of the + // pool + std::vector const& getPoolIDsByAsset(Asset const& asset) const; + + friend struct BucketLevelSnapshot; +}; +} \ No newline at end of file diff --git a/src/bucket/BucketSnapshotManager.cpp b/src/bucket/BucketSnapshotManager.cpp new file mode 100644 index 0000000000..04a72fb03d --- /dev/null +++ b/src/bucket/BucketSnapshotManager.cpp @@ -0,0 +1,105 @@ +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketSnapshotManager.h" +#include "bucket/BucketListSnapshot.h" +#include "main/Application.h" + +#include "medida/meter.h" +#include "medida/metrics_registry.h" + +namespace stellar +{ + +BucketSnapshotManager::BucketSnapshotManager( + medida::MetricsRegistry& metrics, + std::unique_ptr&& snapshot) + : mMetrics(metrics) + , mCurrentSnapshot(std::move(snapshot)) + , mBulkLoadMeter( + mMetrics.NewMeter({"bucketlistDB", "query", "loads"}, "query")) + , mBloomMisses( + mMetrics.NewMeter({"bucketlistDB", "bloom", "misses"}, "bloom")) + , mBloomLookups( + mMetrics.NewMeter({"bucketlistDB", "bloom", "lookups"}, "bloom")) +{ + releaseAssert(threadIsMain()); +} + +std::unique_ptr +BucketSnapshotManager::getSearchableBucketListSnapshot() const +{ + // Can't use std::make_unique due to private constructor + return std::unique_ptr( + new SearchableBucketListSnapshot(*this)); +} + +medida::Timer& +BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label, + size_t numEntries) const +{ + // For now, only keep metrics for the main thread. We can decide on what + // metrics make sense when more background services are added later. + releaseAssert(threadIsMain()); + + if (numEntries != 0) + { + mBulkLoadMeter.Mark(numEntries); + } + + auto iter = mBulkTimers.find(label); + if (iter == mBulkTimers.end()) + { + auto& metric = mMetrics.NewTimer({"bucketlistDB", "bulk", label}); + iter = mBulkTimers.emplace(label, metric).first; + } + + return iter->second; +} + +medida::Timer& +BucketSnapshotManager::getPointLoadTimer(LedgerEntryType t) const +{ + // For now, only keep metrics for the main thread. We can decide on what + // metrics make sense when more background services are added later. + releaseAssert(threadIsMain()); + + auto iter = mPointTimers.find(t); + if (iter == mPointTimers.end()) + { + auto const& label = xdr::xdr_traits::enum_name(t); + auto& metric = mMetrics.NewTimer({"bucketlistDB", "point", label}); + iter = mPointTimers.emplace(t, metric).first; + } + + return iter->second; +} + +void +BucketSnapshotManager::maybeUpdateSnapshot( + std::unique_ptr& snapshot) const +{ + std::lock_guard lock(mSnapshotMutex); + if (!snapshot || + snapshot->getLedgerSeq() != mCurrentSnapshot->getLedgerSeq()) + { + // Should only update with a newer snapshot + releaseAssert(!snapshot || snapshot->getLedgerSeq() < + mCurrentSnapshot->getLedgerSeq()); + snapshot = std::make_unique(*mCurrentSnapshot); + } +} + +void +BucketSnapshotManager::updateCurrentSnapshot( + std::unique_ptr&& newSnapshot) +{ + releaseAssert(newSnapshot); + releaseAssert(threadIsMain()); + std::lock_guard lock(mSnapshotMutex); + releaseAssert(!mCurrentSnapshot || newSnapshot->getLedgerSeq() >= + mCurrentSnapshot->getLedgerSeq()); + mCurrentSnapshot.swap(newSnapshot); +} +} \ No newline at end of file diff --git a/src/bucket/BucketSnapshotManager.h b/src/bucket/BucketSnapshotManager.h new file mode 100644 index 0000000000..1c368efbc0 --- /dev/null +++ b/src/bucket/BucketSnapshotManager.h @@ -0,0 +1,84 @@ +#pragma once + +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketManagerImpl.h" +#include "util/NonCopyable.h" +#include "util/UnorderedMap.h" +#include "util/types.h" + +#include +#include + +namespace medida +{ +class Meter; +class MetricsRegistry; +class Timer; +} + +namespace stellar +{ + +class Application; +class BucketList; +class BucketListSnapshot; + +// This class serves as the boundary between non-threadsafe singleton classes +// (BucketManager, BucketList, Metrics, etc) and threadsafe, parallel BucketList +// snapshots. +class BucketSnapshotManager : NonMovableOrCopyable +{ + private: + medida::MetricsRegistry& mMetrics; + + // Snapshot that is maintained and periodically updated by BucketManager on + // the main thread. When background threads need to generate or refresh a + // snapshot, they will copy this snapshot. + std::unique_ptr mCurrentSnapshot{}; + + // Lock must be held when accessing mCurrentSnapshot + mutable std::recursive_mutex mSnapshotMutex; + + mutable UnorderedMap mPointTimers{}; + mutable UnorderedMap mBulkTimers{}; + + medida::Meter& mBulkLoadMeter; + medida::Meter& mBloomMisses; + medida::Meter& mBloomLookups; + + // Called by main thread to update mCurrentSnapshot whenever the BucketList + // is updated + void updateCurrentSnapshot( + std::unique_ptr&& newSnapshot); + + friend void + BucketManagerImpl::addBatch(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, + std::vector const& initEntries, + std::vector const& liveEntries, + std::vector const& deadEntries); + friend void BucketManagerImpl::assumeState(HistoryArchiveState const& has, + uint32_t maxProtocolVersion, + bool restartMerges); + + public: + BucketSnapshotManager(medida::MetricsRegistry& metrics, + std::unique_ptr&& snapshot); + + std::unique_ptr + getSearchableBucketListSnapshot() const; + + // Checks if snapshot is out of date with mCurrentSnapshot and updates + // it accordingly + void maybeUpdateSnapshot( + std::unique_ptr& snapshot) const; + + medida::Timer& recordBulkLoadMetrics(std::string const& label, + size_t numEntries) const; + + medida::Timer& getPointLoadTimer(LedgerEntryType t) const; +}; +} \ No newline at end of file diff --git a/src/bucket/test/BucketIndexTests.cpp b/src/bucket/test/BucketIndexTests.cpp index ed505e41e4..241581b6a7 100644 --- a/src/bucket/test/BucketIndexTests.cpp +++ b/src/bucket/test/BucketIndexTests.cpp @@ -7,6 +7,7 @@ #include "bucket/BucketIndexImpl.h" #include "bucket/BucketList.h" +#include "bucket/BucketListSnapshot.h" #include "bucket/BucketManager.h" #include "bucket/test/BucketTestUtils.h" #include "ledger/test/LedgerTestUtils.h" @@ -197,15 +198,19 @@ class BucketIndexTest virtual void run() { + auto searchableBL = getBM() + .getBucketSnapshotManager() + .getSearchableBucketListSnapshot(); + // Test bulk load lookup - auto loadResult = getBM().loadKeys(mKeysToSearch); + auto loadResult = searchableBL->loadKeys(mKeysToSearch); validateResults(mTestEntries, loadResult); // Test individual entry lookup loadResult.clear(); for (auto const& key : mKeysToSearch) { - auto entryPtr = getBM().getLedgerEntry(key); + auto entryPtr = searchableBL->getLedgerEntry(key); if (entryPtr) { loadResult.emplace_back(*entryPtr); @@ -219,6 +224,9 @@ class BucketIndexTest virtual void runPerf(size_t n) { + auto searchableBL = getBM() + .getBucketSnapshotManager() + .getSearchableBucketListSnapshot(); for (size_t i = 0; i < n; ++i) { LedgerKeySet searchSubset; @@ -247,7 +255,7 @@ class BucketIndexTest searchSubset.insert(addKeys.begin(), addKeys.end()); } - auto blLoad = getBM().loadKeys(searchSubset); + auto blLoad = searchableBL->loadKeys(searchSubset); validateResults(testEntriesSubset, blLoad); } } @@ -255,6 +263,10 @@ class BucketIndexTest void testInvalidKeys() { + auto searchableBL = getBM() + .getBucketSnapshotManager() + .getSearchableBucketListSnapshot(); + // Load should return empty vector for keys not in bucket list auto keysNotInBL = LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions( @@ -262,12 +274,12 @@ class BucketIndexTest LedgerKeySet invalidKeys(keysNotInBL.begin(), keysNotInBL.end()); // Test bulk load - REQUIRE(getBM().loadKeys(invalidKeys).size() == 0); + REQUIRE(searchableBL->loadKeys(invalidKeys).size() == 0); // Test individual load for (auto const& key : invalidKeys) { - auto entryPtr = getBM().getLedgerEntry(key); + auto entryPtr = searchableBL->getLedgerEntry(key); REQUIRE(!entryPtr); } } @@ -427,8 +439,12 @@ class BucketIndexPoolShareTest : public BucketIndexTest virtual void run() override { - auto loadResult = getBM().loadPoolShareTrustLinesByAccountAndAsset( - mAccountToSearch.accountID, mAssetToSearch); + auto searchableBL = getBM() + .getBucketSnapshotManager() + .getSearchableBucketListSnapshot(); + auto loadResult = + searchableBL->loadPoolShareTrustLinesByAccountAndAsset( + mAccountToSearch.accountID, mAssetToSearch); validateResults(mTestEntries, loadResult); } }; diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index 5d493fc58f..2e48788521 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -13,6 +13,7 @@ #include "bucket/Bucket.h" #include "bucket/BucketInputIterator.h" #include "bucket/BucketList.h" +#include "bucket/BucketListSnapshot.h" #include "bucket/BucketManager.h" #include "bucket/BucketOutputIterator.h" #include "bucket/test/BucketTestUtils.h" @@ -1114,6 +1115,45 @@ TEST_CASE_VERSIONS("eviction scan", "[bucketlist]") }); } +TEST_CASE_VERSIONS("Searchable BucketListDB snapshots", "[bucketlist]") +{ + VirtualClock clock; + Config cfg(getTestConfig(0, Config::TESTDB_IN_MEMORY_SQLITE)); + cfg.EXPERIMENTAL_BUCKETLIST_DB = true; + + auto app = createTestApplication(clock, cfg); + LedgerManagerForBucketTests& lm = app->getLedgerManager(); + auto& bm = app->getBucketManager(); + + auto entry = + LedgerTestUtils::generateValidLedgerEntryOfType(CLAIMABLE_BALANCE); + entry.data.claimableBalance().amount = 0; + + auto searchableBL = + bm.getBucketSnapshotManager().getSearchableBucketListSnapshot(); + + // Update entry every 5 ledgers so we can see bucket merge events + for (auto ledgerSeq = 1; ledgerSeq < 101; ++ledgerSeq) + { + if ((ledgerSeq - 1) % 5 == 0) + { + ++entry.data.claimableBalance().amount; + entry.lastModifiedLedgerSeq = ledgerSeq; + lm.setNextLedgerEntryBatchForBucketTesting({}, {entry}, {}); + } + else + { + lm.setNextLedgerEntryBatchForBucketTesting({}, {}, {}); + } + + closeLedger(*app); + + // Snapshot should automatically update with latest version + auto loadedEntry = searchableBL->getLedgerEntry(LedgerEntryKey(entry)); + REQUIRE((loadedEntry && *loadedEntry == entry)); + } +} + static std::string formatX32(uint32_t v) { diff --git a/src/bucket/test/BucketTestUtils.cpp b/src/bucket/test/BucketTestUtils.cpp index e362f1d345..0fa39d7cb3 100644 --- a/src/bucket/test/BucketTestUtils.cpp +++ b/src/bucket/test/BucketTestUtils.cpp @@ -119,8 +119,8 @@ LedgerManagerForBucketTests::transferLedgerEntriesToBucketList( { { LedgerTxn ltxEvictions(ltx); - mApp.getBucketManager().scanForEviction(ltxEvictions, - ledgerSeq); + mApp.getBucketManager().scanForEvictionLegacySQL(ltxEvictions, + ledgerSeq); if (ledgerCloseMeta) { ledgerCloseMeta->populateEvictedEntries( diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index c914a5193b..7b7b9c832e 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -1624,7 +1624,8 @@ LedgerManagerImpl::transferLedgerEntriesToBucketList( { { LedgerTxn ltxEvictions(ltx); - mApp.getBucketManager().scanForEviction(ltxEvictions, ledgerSeq); + mApp.getBucketManager().scanForEvictionLegacySQL(ltxEvictions, + ledgerSeq); if (ledgerCloseMeta) { ledgerCloseMeta->populateEvictedEntries( diff --git a/src/ledger/LedgerTxn.cpp b/src/ledger/LedgerTxn.cpp index 513a977381..fee0e46a5c 100644 --- a/src/ledger/LedgerTxn.cpp +++ b/src/ledger/LedgerTxn.cpp @@ -4,6 +4,7 @@ #include "ledger/LedgerTxn.h" #include "bucket/BucketList.h" +#include "bucket/BucketListSnapshot.h" #include "bucket/BucketManager.h" #include "crypto/Hex.h" #include "crypto/KeyUtils.h" @@ -2987,7 +2988,7 @@ LedgerTxnRoot::Impl::prefetch(UnorderedSet const& keys) insertIfNotLoaded(keysToSearch, key); } - auto blLoad = mApp.getBucketManager().loadKeys(keysToSearch); + auto blLoad = getSearchableBucketListSnapshot().loadKeys(keysToSearch); cacheResult(populateLoadedEntries(keysToSearch, blLoad)); } else @@ -3369,6 +3370,20 @@ LedgerTxnRoot::Impl::areEntriesMissingInCacheForOffer(OfferEntry const& oe) return false; } +SearchableBucketListSnapshot& +LedgerTxnRoot::Impl::getSearchableBucketListSnapshot() const +{ + releaseAssert(mApp.getConfig().isUsingBucketListDB()); + if (!mSearchableBucketListSnapshot) + { + mSearchableBucketListSnapshot = mApp.getBucketManager() + .getBucketSnapshotManager() + .getSearchableBucketListSnapshot(); + } + + return *mSearchableBucketListSnapshot; +} + std::shared_ptr LedgerTxnRoot::Impl::getBestOffer(Asset const& buying, Asset const& selling, OfferDescriptor const* worseThan) @@ -3505,7 +3520,7 @@ LedgerTxnRoot::Impl::getPoolShareTrustLinesByAccountAndAsset( if (mApp.getConfig().isUsingBucketListDB()) { trustLines = - mApp.getBucketManager() + getSearchableBucketListSnapshot() .loadPoolShareTrustLinesByAccountAndAsset(account, asset); } else @@ -3568,8 +3583,8 @@ LedgerTxnRoot::Impl::getInflationWinners(size_t maxWinners, int64_t minVotes) { if (mApp.getConfig().isUsingBucketListDB()) { - return mApp.getBucketManager().loadInflationWinners(maxWinners, - minVotes); + return getSearchableBucketListSnapshot().loadInflationWinners( + maxWinners, minVotes); } else { @@ -3624,7 +3639,7 @@ LedgerTxnRoot::Impl::getNewestVersion(InternalLedgerKey const& gkey) const { if (mApp.getConfig().isUsingBucketListDB() && key.type() != OFFER) { - entry = mApp.getBucketManager().getLedgerEntry(key); + entry = getSearchableBucketListSnapshot().getLedgerEntry(key); } else { diff --git a/src/ledger/LedgerTxnImpl.h b/src/ledger/LedgerTxnImpl.h index 49e566245b..ad83395703 100644 --- a/src/ledger/LedgerTxnImpl.h +++ b/src/ledger/LedgerTxnImpl.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "bucket/BucketList.h" #include "database/Database.h" #include "ledger/LedgerTxn.h" #include "util/RandomEvictionCache.h" @@ -19,6 +20,8 @@ namespace stellar { +class SearchableBucketListSnapshot; + // Precondition: The keys associated with entries are unique and constitute a // subset of keys template @@ -737,6 +740,8 @@ class LedgerTxnRoot::Impl mutable BestOffers mBestOffers; mutable uint64_t mPrefetchHits{0}; mutable uint64_t mPrefetchMisses{0}; + mutable std::unique_ptr + mSearchableBucketListSnapshot{}; size_t mBulkLoadBatchSize; std::unique_ptr mTransaction; @@ -869,6 +874,8 @@ class LedgerTxnRoot::Impl bool areEntriesMissingInCacheForOffer(OfferEntry const& oe); + SearchableBucketListSnapshot& getSearchableBucketListSnapshot() const; + public: // Constructor has the strong exception safety guarantee Impl(Application& app, size_t entryCacheSize, size_t prefetchBatchSize