Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable background overlay by default, drop 'experimental' prefix #4534

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void
BucketManagerImpl::initialize()
{
ZoneScoped;
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;

if (!fs::exists(d))
{
Expand Down Expand Up @@ -122,17 +122,17 @@ BucketManagerImpl::initialize()
mLockedBucketDir = std::make_unique<std::string>(d);
mTmpDirManager = std::make_unique<TmpDirManager>(d + "/tmp");

if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (mConfig.MODE_ENABLES_BUCKETLIST)
{
mBucketList = std::make_unique<BucketList>();

if (mApp.getConfig().isUsingBucketListDB())
if (mConfig.isUsingBucketListDB())
{
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
mApp,
std::make_unique<BucketListSnapshot>(*mBucketList,
LedgerHeader()),
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
mConfig.QUERY_SNAPSHOT_LEDGERS);
}
}

Expand All @@ -141,11 +141,10 @@ BucketManagerImpl::initialize()
// BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database
// so create the remaining ledger header, transactions and results
// directories
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig());
HistoryManager::createPublishQueueDir(mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mConfig);
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mConfig);
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mConfig);
HistoryManager::createPublishQueueDir(mConfig);
}

void
Expand Down Expand Up @@ -201,6 +200,7 @@ BucketManagerImpl::BucketManagerImpl(Application& app)
// mode does not use minimal DB
, mDeleteEntireBucketDirInDtor(
app.getConfig().isInMemoryModeWithoutMinimalDB())
, mConfig(app.getConfig())
{
for (uint32_t t =
static_cast<uint32_t>(LedgerEntryTypeAndDurability::ACCOUNT);
Expand Down Expand Up @@ -299,7 +299,7 @@ void
BucketManagerImpl::deleteEntireBucketDir()
{
ZoneScoped;
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;
if (fs::exists(d))
{
// First clean out the contents of the tmpdir, as usual.
Expand Down Expand Up @@ -332,7 +332,7 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
// Then delete the lockfile $BUCKET_DIR_PATH/stellar-core.lock
if (mLockedBucketDir)
{
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;
std::string lock = d + "/" + kLockFilename;
releaseAssert(fs::exists(lock));
fs::unlockFile(lock);
Expand All @@ -343,14 +343,14 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
BucketList&
BucketManagerImpl::getBucketList()
{
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
return *mBucketList;
}

BucketSnapshotManager&
BucketManagerImpl::getBucketSnapshotManager() const
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
releaseAssertOrThrow(mConfig.isUsingBucketListDB());
releaseAssert(mSnapshotManager);
return *mSnapshotManager;
}
Expand Down Expand Up @@ -476,7 +476,7 @@ BucketManagerImpl::renameBucketDirFile(std::filesystem::path const& src,
std::filesystem::path const& dst)
{
ZoneScoped;
if (mApp.getConfig().DISABLE_XDR_FSYNC)
if (mConfig.DISABLE_XDR_FSYNC)
{
return rename(src.string().c_str(), dst.string().c_str()) == 0;
}
Expand All @@ -492,7 +492,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
std::unique_ptr<BucketIndex const> index)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);

if (mergeKey)
Expand Down Expand Up @@ -566,7 +566,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
void
BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
{
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// We _do_ want to remove the mergeKey from mLiveFutures, both so that that
// map does not grow without bound and more importantly so that we drop the
Expand Down Expand Up @@ -681,7 +681,7 @@ BucketManagerImpl::putMergeFuture(
MergeKey const& key, std::shared_future<std::shared_ptr<Bucket>> wp)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
CLOG_TRACE(
Bucket,
Expand All @@ -704,7 +704,7 @@ BucketManagerImpl::getBucketListReferencedBuckets() const
{
ZoneScoped;
std::set<Hash> referenced;
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (!mConfig.MODE_ENABLES_BUCKETLIST)
{
return referenced;
}
Expand Down Expand Up @@ -743,7 +743,7 @@ BucketManagerImpl::getAllReferencedBuckets() const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (!mConfig.MODE_ENABLES_BUCKETLIST)
{
return referenced;
}
Expand Down Expand Up @@ -788,7 +788,7 @@ void
BucketManagerImpl::cleanupStaleFiles()
{
ZoneScoped;
if (mApp.getConfig().DISABLE_BUCKET_GC)
if (mConfig.DISABLE_BUCKET_GC)
{
return;
}
Expand Down Expand Up @@ -867,7 +867,7 @@ BucketManagerImpl::forgetUnreferencedBuckets()
CLOG_TRACE(Bucket,
"BucketManager::forgetUnreferencedBuckets dropping {}",
filename);
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
{
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
std::filesystem::remove(filename);
Expand Down Expand Up @@ -974,7 +974,7 @@ BucketManagerImpl::snapshotLedger(LedgerHeader& currentHeader)
{
ZoneScoped;
Hash hash;
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (mConfig.MODE_ENABLES_BUCKETLIST)
{
hash = mBucketList->getHash();
}
Expand Down Expand Up @@ -1017,7 +1017,7 @@ BucketManagerImpl::scanForEvictionLegacy(AbstractLedgerTxn& ltx,
void
BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
{
releaseAssert(mApp.getConfig().isUsingBucketListDB());
releaseAssert(mConfig.isUsingBucketListDB());
releaseAssert(mSnapshotManager);
releaseAssert(!mEvictionFuture.valid());
releaseAssert(mEvictionStatistics);
Expand Down Expand Up @@ -1176,7 +1176,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
{
Expand All @@ -1203,7 +1203,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,

// Buckets on the BucketList should always be indexed when
// BucketListDB enabled
if (mApp.getConfig().isUsingBucketListDB())
if (mConfig.isUsingBucketListDB())
{
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
Expand Down Expand Up @@ -1328,7 +1328,7 @@ BucketManagerImpl::mergeBuckets(HistoryArchiveState const& has)
BucketMetadata meta;
MergeCounters mc;
auto& ctx = mApp.getClock().getIOContext();
meta.ledgerVersion = mApp.getConfig().LEDGER_PROTOCOL_VERSION;
meta.ledgerVersion = mConfig.LEDGER_PROTOCOL_VERSION;
BucketOutputIterator out(getTmpDir(), /*keepDeadEntries=*/false, meta, mc,
ctx, /*doFsync=*/true);
for (auto const& pair : ledgerMap)
Expand Down Expand Up @@ -1539,13 +1539,13 @@ BucketManagerImpl::scheduleVerifyReferencedBucketsWork()
Config const&
BucketManagerImpl::getConfig() const
{
return mApp.getConfig();
return mConfig;
}

std::shared_ptr<SearchableBucketListSnapshot>
BucketManagerImpl::getSearchableBucketListSnapshot()
{
releaseAssert(mApp.getConfig().isUsingBucketListDB());
releaseAssert(mConfig.isUsingBucketListDB());
// Any other threads must maintain their own snapshot
releaseAssert(threadIsMain());
if (!mSearchableBucketListSnapshot)
Expand All @@ -1560,7 +1560,7 @@ BucketManagerImpl::getSearchableBucketListSnapshot()
void
BucketManagerImpl::reportBucketEntryCountMetrics()
{
if (!mApp.getConfig().isUsingBucketListDB())
if (!mConfig.isUsingBucketListDB())
{
return;
}
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "bucket/BucketList.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketMergeMap.h"
#include "main/Config.h"
#include "xdr/Stellar-ledger.h"

#include <map>
Expand Down Expand Up @@ -72,6 +73,8 @@ class BucketManagerImpl : public BucketManager
std::future<EvictionResult> mEvictionFuture{};

bool const mDeleteEntireBucketDirInDtor;
// Copy app's config for thread-safe access
Config const mConfig;

// Records bucket-merges that are currently _live_ in some FutureBucket, in
// the sense of either running, or finished (with or without the
Expand Down
2 changes: 1 addition & 1 deletion src/herder/test/HerderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3361,7 +3361,7 @@ TEST_CASE("overlay parallel processing")
Topologies::core(4, 1, Simulation::OVER_TCP, networkID, [](int i) {
auto cfg = getTestConfig(i);
cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 100;
cfg.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING = true;
cfg.BACKGROUND_OVERLAY_PROCESSING = true;
return cfg;
});
simulation->startAllNodes();
Expand Down
4 changes: 2 additions & 2 deletions src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
mEvictionIOContext
? std::make_unique<asio::io_context::work>(*mEvictionIOContext)
: nullptr)
, mOverlayIOContext(mConfig.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING
, mOverlayIOContext(mConfig.BACKGROUND_OVERLAY_PROCESSING
? std::make_unique<asio::io_context>(1)
: nullptr)
, mOverlayWork(mOverlayIOContext ? std::make_unique<asio::io_context::work>(
Expand Down Expand Up @@ -181,7 +181,7 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
mWorkerThreads.emplace_back(std::move(thread));
}

if (mConfig.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING)
if (mConfig.BACKGROUND_OVERLAY_PROCESSING)
{
// Keep priority unchanged as overlay processes time-sensitive tasks
mOverlayThread = std::thread{[this]() { mOverlayIOContext->run(); }};
Expand Down
17 changes: 11 additions & 6 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
CATCHUP_COMPLETE = false;
CATCHUP_RECENT = 0;
EXPERIMENTAL_PRECAUTION_DELAY_META = false;
EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING = false;
BACKGROUND_OVERLAY_PROCESSING = true;
DEPRECATED_SQL_LEDGER_STATE = false;
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14; // 2^14 == 16 kb
BUCKETLIST_DB_INDEX_CUTOFF = 20; // 20 mb
Expand Down Expand Up @@ -1066,9 +1066,14 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
}},
{"EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING",
[&]() {
EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING =
readBool(item);
CLOG_WARNING(Overlay,
"EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING "
"is deprecated. Use "
"BACKGROUND_OVERLAY_PROCESSING instead");
BACKGROUND_OVERLAY_PROCESSING = readBool(item);
}},
{"BACKGROUND_OVERLAY_PROCESSING",
[&]() { BACKGROUND_OVERLAY_PROCESSING = readBool(item); }},
{"BACKGROUND_EVICTION_SCAN",
[&]() { BACKGROUND_EVICTION_SCAN = readBool(item); }},
// TODO: Flag is no longer supported, remove in next release.
Expand Down Expand Up @@ -1970,7 +1975,7 @@ Config::adjust()
}

void
Config::logBasicInfo()
Config::logBasicInfo() const
{
LOG_INFO(DEFAULT_LOG, "Connection effective settings:");
LOG_INFO(DEFAULT_LOG, "TARGET_PEER_CONNECTIONS: {}",
Expand All @@ -1984,9 +1989,9 @@ Config::logBasicInfo()
LOG_INFO(DEFAULT_LOG, "MAX_INBOUND_PENDING_CONNECTIONS: {}",
MAX_INBOUND_PENDING_CONNECTIONS);
LOG_INFO(DEFAULT_LOG,
"EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING="
"BACKGROUND_OVERLAY_PROCESSING="
"{}",
EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING ? "true" : "false");
BACKGROUND_OVERLAY_PROCESSING ? "true" : "false");
}

void
Expand Down
4 changes: 2 additions & 2 deletions src/main/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class Config : public std::enable_shared_from_this<Config>
size_t BUCKETLIST_DB_INDEX_CUTOFF;

// Enable parallel processing of overlay operations (experimental)
bool EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING;
bool BACKGROUND_OVERLAY_PROCESSING;

// When set to true, BucketListDB indexes are persisted on-disk so that the
// BucketList does not need to be reindexed on startup. Defaults to true.
Expand Down Expand Up @@ -739,7 +739,7 @@ class Config : public std::enable_shared_from_this<Config>
bool isPersistingBucketListDBIndexes() const;
bool modeStoresAllHistory() const;
bool modeStoresAnyHistory() const;
void logBasicInfo();
void logBasicInfo() const;
void setNoListen();
void setNoPublish();

Expand Down
22 changes: 11 additions & 11 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,17 @@ OverlayManagerImpl::triggerPeerResolution()

// Trigger DNS resolution on the background thread
using task_t = std::packaged_task<ResolvedPeers()>;
std::shared_ptr<task_t> task = std::make_shared<task_t>([this]() {
if (!this->mShuttingDown)
{
auto known = resolvePeers(this->mApp.getConfig().KNOWN_PEERS);
auto preferred =
resolvePeers(this->mApp.getConfig().PREFERRED_PEERS);
return ResolvedPeers{known.first, preferred.first,
known.second || preferred.second};
}
return ResolvedPeers{{}, {}, false};
});
std::shared_ptr<task_t> task =
std::make_shared<task_t>([this, cfg = mApp.getConfig()]() {
if (!this->mShuttingDown)
{
auto known = resolvePeers(cfg.KNOWN_PEERS);
auto preferred = resolvePeers(cfg.PREFERRED_PEERS);
return ResolvedPeers{known.first, preferred.first,
known.second || preferred.second};
}
return ResolvedPeers{{}, {}, false};
});

mResolvedPeers = task->get_future();
mApp.postOnBackgroundThread(bind(&task_t::operator(), task),
Expand Down
3 changes: 1 addition & 2 deletions src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
virtual bool
useBackgroundThread() const
{
return mAppConnector.getConfig()
.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING;
return mAppConnector.getConfig().BACKGROUND_OVERLAY_PROCESSING;
}

void initialize(PeerBareAddress const& address);
Expand Down
7 changes: 3 additions & 4 deletions src/overlay/PeerDoor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ PeerDoor::acceptNextPeer()
// io_context on main (as long as the socket is not accessed by multiple
// threads simultaneously, or the caller manually synchronizes access to the
// socket).
auto& ioContext =
mApp.getConfig().EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING
? mApp.getOverlayIOContext()
: mApp.getClock().getIOContext();
auto& ioContext = mApp.getConfig().BACKGROUND_OVERLAY_PROCESSING
? mApp.getOverlayIOContext()
: mApp.getClock().getIOContext();
auto sock = make_shared<TCPPeer::SocketType>(ioContext, TCPPeer::BUFSZ);
mAcceptor.async_accept(sock->next_layer(),
[this, sock](asio::error_code const& ec) {
Expand Down
Loading
Loading