Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BucketListDB in-memory Buckets #4630

Merged
merged 4 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ bucketlist-archive.size.bytes | counter | total size of the hot ar
bucketlist.size.bytes | counter | total size of the BucketList in bytes
bucketlist.entryCounts.-<X> | counter | number of entries of type <X> in the BucketList
bucketlist.entrySizes.-<X> | counter | size of entries of type <X> in the BucketList
bucketlistDB.bloom.lookups | meter | number of bloom filter lookups
bucketlistDB.bloom.misses | meter | number of bloom filter false positives
bucketlistDB.bulk.loads | meter | number of entries BucketListDB queried to prefetch
bucketlistDB.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB.bulk.prefetch | timer | time to prefetch
bucketlistDB.point.<X> | timer | time to load single entry of type <X> (if no bloom miss occurred)
bucketlistDB-<X>.bloom.lookups | meter | number of bloom filter lookups on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bloom.misses | meter | number of bloom filter false positives on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bulk.loads | meter | number of entries BucketListDB queried to prefetch on BucketList <X> (live/hot-archive)
bucketlistDB-live.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB-live.bulk.prefetch | timer | time to prefetch
bucketlistDB-<X>.point.<y> | timer | time to load single entry of type <Y> on BucketList <X> (live/hotArchive)
crypto.verify.hit | meter | number of signature cache hits
crypto.verify.miss | meter | number of signature cache misses
crypto.verify.total | meter | sum of both hits and misses
Expand Down
4 changes: 2 additions & 2 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ MAX_DEX_TX_OPERATIONS_IN_TX_SET = 0
# 0, indiviudal index is always used. Default page size 16 kb.
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14

# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 20
# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 250
# Size, in MB, determining whether a bucket should have an individual
# key index or a key range index. If bucket size is below this value, range
# based index will be used. If set to 0, all buckets are range indexed. If
# BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT == 0, value ingnored and all
# buckets have individual key index.
BUCKETLIST_DB_INDEX_CUTOFF = 20
BUCKETLIST_DB_INDEX_CUTOFF = 250

# BUCKETLIST_DB_PERSIST_INDEX (bool) default true
# Determines whether BucketListDB indexes are saved to disk for faster
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ shouldApplyEntry(BucketEntry const& e)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
return BucketIndex::typeNotSupported(e.liveEntry().data.type());
return LiveBucketIndex::typeNotSupported(e.liveEntry().data.type());
}

if (e.type() != DEADENTRY)
{
throw std::runtime_error(
"Malformed bucket: unexpected non-INIT/LIVE/DEAD entry.");
}
return BucketIndex::typeNotSupported(e.deadEntry().type());
return LiveBucketIndex::typeNotSupported(e.deadEntry().type());
}

size_t
Expand Down
103 changes: 49 additions & 54 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// else.
#include "util/asio.h" // IWYU pragma: keep
#include "bucket/BucketBase.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketOutputIterator.h"
Expand All @@ -30,36 +29,35 @@
namespace stellar
{

BucketIndex const&
BucketBase::getIndex() const
template <class BucketT, class IndexT>
IndexT const&
BucketBase<BucketT, IndexT>::getIndex() const
{
ZoneScoped;
releaseAssertOrThrow(!mFilename.empty());
releaseAssertOrThrow(mIndex);
return *mIndex;
}

template <class BucketT, class IndexT>
bool
BucketBase::isIndexed() const
BucketBase<BucketT, IndexT>::isIndexed() const
{
return static_cast<bool>(mIndex);
}

std::optional<std::pair<std::streamoff, std::streamoff>>
BucketBase::getOfferRange() const
{
return getIndex().getOfferRange();
}

template <class BucketT, class IndexT>
void
BucketBase::setIndex(std::unique_ptr<BucketIndex const>&& index)
BucketBase<BucketT, IndexT>::setIndex(std::unique_ptr<IndexT const>&& index)
{
releaseAssertOrThrow(!mIndex);
mIndex = std::move(index);
}

BucketBase::BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index)
template <class BucketT, class IndexT>
BucketBase<BucketT, IndexT>::BucketBase(std::string const& filename,
Hash const& hash,
std::unique_ptr<IndexT const>&& index)
: mFilename(filename), mHash(hash), mIndex(std::move(index))
{
releaseAssert(filename.empty() || fs::exists(filename));
Expand All @@ -71,30 +69,34 @@ BucketBase::BucketBase(std::string const& filename, Hash const& hash,
}
}

BucketBase::BucketBase()
template <class BucketT, class IndexT> BucketBase<BucketT, IndexT>::BucketBase()
{
}

template <class BucketT, class IndexT>
Hash const&
BucketBase::getHash() const
BucketBase<BucketT, IndexT>::getHash() const
{
return mHash;
}

template <class BucketT, class IndexT>
std::filesystem::path const&
BucketBase::getFilename() const
BucketBase<BucketT, IndexT>::getFilename() const
{
return mFilename;
}

template <class BucketT, class IndexT>
size_t
BucketBase::getSize() const
BucketBase<BucketT, IndexT>::getSize() const
{
return mSize;
}

template <class BucketT, class IndexT>
bool
BucketBase::isEmpty() const
BucketBase<BucketT, IndexT>::isEmpty() const
{
if (mFilename.empty() || isZero(mHash))
{
Expand All @@ -105,14 +107,17 @@ BucketBase::isEmpty() const
return false;
}

template <class BucketT, class IndexT>
void
BucketBase::freeIndex()
BucketBase<BucketT, IndexT>::freeIndex()
{
mIndex.reset(nullptr);
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
BucketBase<BucketT, IndexT>::randomFileName(std::string const& tmpDir,
std::string ext)
{
ZoneScoped;
for (;;)
Expand All @@ -127,14 +132,16 @@ BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
}
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".xdr");
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketIndexName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketIndexName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".index");
}
Expand Down Expand Up @@ -172,7 +179,7 @@ BucketBase::randomBucketIndexName(std::string const& tmpDir)
// and shadowing protocol simultaneously, the moment the first new-protocol
// bucket enters the youngest level. At least one new bucket is in every merge's
// shadows from then on in, so they all upgrade (and preserve lifecycle events).
template <class BucketT>
template <class BucketT, class IndexT>
static void
calculateMergeProtocolVersion(
MergeCounters& mc, uint32_t maxProtocolVersion,
Expand Down Expand Up @@ -253,7 +260,7 @@ calculateMergeProtocolVersion(
// side, or entries that compare non-equal. In all these cases we just
// take the lesser (or existing) entry and advance only one iterator,
// not scrutinizing the entry type further.
template <class BucketT>
template <class BucketT, class IndexT>
static bool
mergeCasesWithDefaultAcceptance(
BucketEntryIdCmp<BucketT> const& cmp, MergeCounters& mc,
Expand Down Expand Up @@ -299,14 +306,15 @@ mergeCasesWithDefaultAcceptance(
return false;
}

template <class BucketT>
template <class BucketT, class IndexT>
std::shared_ptr<BucketT>
BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync)
BucketBase<BucketT, IndexT>::merge(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -326,9 +334,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,

uint32_t protocolVersion;
bool keepShadowedLifecycleEntries;
calculateMergeProtocolVersion<BucketT>(mc, maxProtocolVersion, oi, ni,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);
calculateMergeProtocolVersion<BucketT, IndexT>(
mc, maxProtocolVersion, oi, ni, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);

auto timer = bucketManager.getMergeTimer().TimeScope();
BucketMetadata meta;
Expand All @@ -340,14 +348,14 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = ni.getMetadata().ext;
}
else if (oi.getMetadata().ext.v() == 1)
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = oi.getMetadata().ext;
}

Expand All @@ -374,9 +382,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
}
}

if (!mergeCasesWithDefaultAcceptance(cmp, mc, oi, ni, out,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
if (!mergeCasesWithDefaultAcceptance<BucketT, IndexT>(
cmp, mc, oi, ni, out, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
{
BucketT::mergeCasesWithEqualKeys(mc, oi, ni, out, shadowIterators,
protocolVersion,
Expand All @@ -400,19 +408,6 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
return out.getBucket(bucketManager, &mk);
}

template std::shared_ptr<LiveBucket> BucketBase::merge<LiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<LiveBucket> const& oldBucket,
std::shared_ptr<LiveBucket> const& newBucket,
std::vector<std::shared_ptr<LiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);

template std::shared_ptr<HotArchiveBucket> BucketBase::merge<HotArchiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<HotArchiveBucket> const& oldBucket,
std::shared_ptr<HotArchiveBucket> const& newBucket,
std::vector<std::shared_ptr<HotArchiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
template class BucketBase<LiveBucket, LiveBucket::IndexT>;
template class BucketBase<HotArchiveBucket, HotArchiveBucket::IndexT>;
}
42 changes: 27 additions & 15 deletions src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// under the apache license, version 2.0. see the copying file at the root
// of this distribution or at http://www.apache.org/licenses/license-2.0

#include "bucket/BucketIndex.h"
#include "bucket/BucketUtils.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-types.h"
Expand Down Expand Up @@ -47,17 +47,37 @@ enum class Loop
INCOMPLETE
};

class HotArchiveBucket;
class LiveBucket;
class LiveBucketIndex;
class HotArchiveBucketIndex;

template <class BucketT, class IndexT>
class BucketBase : public NonMovableOrCopyable
{
BUCKET_TYPE_ASSERT(BucketT);

// Because of the CRTP design with derived Bucket classes, this base class
// does not have direct access to BucketT::IndexT, so we take two templates
// and make this assert.
static_assert(
std::is_same_v<
IndexT,
std::conditional_t<
std::is_same_v<BucketT, LiveBucket>, LiveBucketIndex,
std::conditional_t<std::is_same_v<BucketT, HotArchiveBucket>,
HotArchiveBucketIndex, void>>>,
"IndexT must match BucketT::IndexT");

protected:
std::filesystem::path const mFilename;
Hash const mHash;
size_t mSize{0};

std::unique_ptr<BucketIndex const> mIndex{};
std::unique_ptr<IndexT const> mIndex{};

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;
IndexT const& getIndex() const;

static std::string randomFileName(std::string const& tmpDir,
std::string ext);
Expand All @@ -74,7 +94,7 @@ class BucketBase : public NonMovableOrCopyable
// exists, but does not check that the hash is the bucket's hash. Caller
// needs to ensure that.
BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);
std::unique_ptr<IndexT const>&& index);

Hash const& getHash() const;
std::filesystem::path const& getFilename() const;
Expand All @@ -88,13 +108,8 @@ class BucketBase : public NonMovableOrCopyable
// Returns true if bucket is indexed, false otherwise
bool isIndexed() const;

// Returns [lowerBound, upperBound) of file offsets for all offers in the
// bucket, or std::nullopt if no offers exist
std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const;

// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);
void setIndex(std::unique_ptr<IndexT const>&& index);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
Expand All @@ -107,7 +122,6 @@ class BucketBase : public NonMovableOrCopyable
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
template <class BucketT>
static std::shared_ptr<BucketT>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
Expand All @@ -120,16 +134,14 @@ class BucketBase : public NonMovableOrCopyable
static std::string randomBucketIndexName(std::string const& tmpDir);

#ifdef BUILD_TESTS
BucketIndex const&
IndexT const&
getIndexForTesting() const
{
return getIndex();
}

#endif // BUILD_TESTS

virtual uint32_t getBucketVersion() const = 0;

template <class BucketT> friend class BucketSnapshotBase;
template <class T> friend class BucketSnapshotBase;
};
}
Loading
Loading