Skip to content

Commit

Permalink
feat: Graceful shutdown (#1801)
Browse files Browse the repository at this point in the history
Fixes #442.
  • Loading branch information
kuznetsss authored Jan 22, 2025
1 parent 12e6fcc commit 9570286
Show file tree
Hide file tree
Showing 41 changed files with 1,072 additions and 190 deletions.
2 changes: 1 addition & 1 deletion src/app/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp)

target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)
8 changes: 8 additions & 0 deletions src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "app/ClioApplication.hpp"

#include "app/Stopper.hpp"
#include "app/WebHandlers.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
Expand All @@ -45,6 +46,8 @@
#include "web/ng/Server.hpp"

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>

#include <cstdint>
#include <cstdlib>
Expand Down Expand Up @@ -84,6 +87,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
{
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
PrometheusService::init(config);
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
}

int
Expand Down Expand Up @@ -169,6 +173,10 @@ ClioApplication::run(bool const useNgWebServer)
return EXIT_FAILURE;
}

appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
);

// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope
// Calls destructors on all resources, and destructs in order
Expand Down
2 changes: 2 additions & 0 deletions src/app/ClioApplication.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "app/Stopper.hpp"
#include "util/SignalsHandler.hpp"
#include "util/newconfig/ConfigDefinition.hpp"

Expand All @@ -30,6 +31,7 @@ namespace app {
class ClioApplication {
util::config::ClioConfigDefinition const& config_;
util::SignalsHandler signalsHandler_;
Stopper appStopper_;

public:
/**
Expand Down
52 changes: 52 additions & 0 deletions src/app/Stopper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include "app/Stopper.hpp"

#include <boost/asio/spawn.hpp>

#include <functional>
#include <thread>
#include <utility>

namespace app {

Stopper::~Stopper()
{
if (worker_.joinable())
worker_.join();
}

void
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
{
boost::asio::spawn(ctx_, std::move(cb));
}

void
Stopper::stop()
{
// Do nothing if worker_ is already running
if (worker_.joinable())
return;

worker_ = std::thread{[this]() { ctx_.run(); }};
}

} // namespace app
118 changes: 118 additions & 0 deletions src/app/Stopper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include "data/BackendInterface.hpp"
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/log/Logger.hpp"
#include "web/ng/Server.hpp"

#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>

#include <functional>
#include <thread>

namespace app {

/**
* @brief Application stopper class. On stop it will create a new thread to run all the shutdown tasks.
*/
class Stopper {
boost::asio::io_context ctx_;
std::thread worker_;

public:
/**
* @brief Destroy the Stopper object
*/
~Stopper();

/**
* @brief Set the callback to be called when the application is stopped.
*
* @param cb The callback to be called on application stop.
*/
void
setOnStop(std::function<void(boost::asio::yield_context)> cb);

/**
* @brief Stop the application and run the shutdown tasks.
*/
void
stop();

/**
* @brief Create a callback to be called on application stop.
*
* @param server The server to stop.
* @param balancer The load balancer to stop.
* @param etl The ETL service to stop.
* @param subscriptions The subscription manager to stop.
* @param backend The backend to stop.
* @param ioc The io_context to stop.
* @return The callback to be called on application stop.
*/
template <
web::ng::SomeServer ServerType,
etl::SomeLoadBalancer LoadBalancerType,
etl::SomeETLService ETLServiceType>
static std::function<void(boost::asio::yield_context)>
makeOnStopCallback(
ServerType& server,
LoadBalancerType& balancer,
ETLServiceType& etl,
feed::SubscriptionManagerInterface& subscriptions,
data::BackendInterface& backend,
boost::asio::io_context& ioc
)
{
return [&](boost::asio::yield_context yield) {
util::CoroutineGroup coroutineGroup{yield};
coroutineGroup.spawn(yield, [&server](auto innerYield) {
server.stop(innerYield);
LOG(util::LogService::info()) << "Server stopped";
});
coroutineGroup.spawn(yield, [&balancer](auto innerYield) {
balancer.stop(innerYield);
LOG(util::LogService::info()) << "LoadBalancer stopped";
});
coroutineGroup.asyncWait(yield);

etl.stop();
LOG(util::LogService::info()) << "ETL stopped";

subscriptions.stop();
LOG(util::LogService::info()) << "SubscriptionManager stopped";

backend.waitForWritesToFinish();
LOG(util::LogService::info()) << "Backend writes finished";

ioc.stop();
LOG(util::LogService::info()) << "io_context stopped";
};
}
};

} // namespace app
6 changes: 6 additions & 0 deletions src/data/BackendInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ class BackendInterface {
bool
finishWrites(std::uint32_t ledgerSequence);

/**
* @brief Wait for all pending writes to finish.
*/
virtual void
waitForWritesToFinish() = 0;

/**
* @brief Mark the migration status of a migrator as Migrated in the database
*
Expand Down
9 changes: 7 additions & 2 deletions src/data/CassandraBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,16 @@ class BasicCassandraBackend : public BackendInterface {
return {txns, {}};
}

void
waitForWritesToFinish() override
{
executor_.sync();
}

bool
doFinishWrites() override
{
// wait for other threads to finish their writes
executor_.sync();
waitForWritesToFinish();

if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
Expand Down
29 changes: 25 additions & 4 deletions src/etl/ETLService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>

#include <concepts>
#include <cstddef>
#include <cstdint>
#include <memory>
Expand All @@ -58,6 +59,16 @@ struct NFTsData;
*/
namespace etl {

/**
* @brief A tag class to help identify ETLService in templated code.
*/
struct ETLServiceTag {
virtual ~ETLServiceTag() = default;
};

template <typename T>
concept SomeETLService = std::derived_from<T, ETLServiceTag>;

/**
* @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the
* databases.
Expand All @@ -71,7 +82,7 @@ namespace etl {
* the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring
* to writing and from writing to monitoring, based on the activity of other processes running on different machines.
*/
class ETLService {
class ETLService : public ETLServiceTag {
// TODO: make these template parameters in ETLService
using LoadBalancerType = LoadBalancer;
using DataPipeType = etl::impl::ExtractionDataPipe<org::xrpl::rpc::v1::GetLedgerResponse>;
Expand Down Expand Up @@ -159,10 +170,20 @@ class ETLService {
/**
* @brief Stops components and joins worker thread.
*/
~ETLService()
~ETLService() override
{
if (not state_.isStopping)
stop();
}

/**
* @brief Stop the ETL service.
* @note This method blocks until the ETL service has stopped.
*/
void
stop()
{
LOG(log_.info()) << "onStop called";
LOG(log_.debug()) << "Stopping Reporting ETL";
LOG(log_.info()) << "Stop called";

state_.isStopping = true;
cacheLoader_.stop();
Expand Down
11 changes: 11 additions & 0 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/Assert.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/Random.hpp"
#include "util/ResponseExpirationCache.hpp"
#include "util/log/Logger.hpp"
Expand Down Expand Up @@ -336,6 +337,16 @@ LoadBalancer::getETLState() noexcept
return etlState_;
}

void
LoadBalancer::stop(boost::asio::yield_context yield)
{
util::CoroutineGroup group{yield};
std::ranges::for_each(sources_, [&group, yield](auto& source) {
group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); });
});
group.asyncWait(yield);
}

void
LoadBalancer::chooseForwardingSource()
{
Expand Down
Loading

0 comments on commit 9570286

Please sign in to comment.