From 957028699b8869ad4bc6907bfeeecdda6b7cfcb2 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Wed, 22 Jan 2025 13:09:16 +0000 Subject: [PATCH] feat: Graceful shutdown (#1801) Fixes #442. --- src/app/CMakeLists.txt | 2 +- src/app/ClioApplication.cpp | 8 + src/app/ClioApplication.hpp | 2 + src/app/Stopper.cpp | 52 +++++ src/app/Stopper.hpp | 118 +++++++++++ src/data/BackendInterface.hpp | 6 + src/data/CassandraBackend.hpp | 9 +- src/etl/ETLService.hpp | 29 ++- src/etl/LoadBalancer.cpp | 11 + src/etl/LoadBalancer.hpp | 24 ++- src/etl/Source.hpp | 9 + src/etl/impl/SourceImpl.hpp | 6 + src/etl/impl/SubscriptionSource.cpp | 84 ++++---- src/etl/impl/SubscriptionSource.hpp | 14 +- src/feed/SubscriptionManager.hpp | 9 + src/feed/SubscriptionManagerInterface.hpp | 6 + src/util/CMakeLists.txt | 1 + src/util/StopHelper.cpp | 46 +++++ src/util/StopHelper.hpp | 54 +++++ src/web/ng/Server.cpp | 40 +++- src/web/ng/Server.hpp | 21 +- src/web/ng/impl/ConnectionHandler.cpp | 65 +++++- src/web/ng/impl/ConnectionHandler.hpp | 23 ++- src/web/ng/impl/HttpConnection.hpp | 8 + src/web/ng/impl/WsConnection.hpp | 8 + tests/common/util/AsioContextTestFixture.hpp | 45 +++++ tests/common/util/MockBackend.hpp | 2 + tests/common/util/MockSource.hpp | 8 +- tests/common/util/MockSubscriptionManager.hpp | 2 + tests/common/web/ng/MockConnection.hpp | 2 +- .../common/web/ng/impl/MockHttpConnection.hpp | 2 +- tests/common/web/ng/impl/MockWsConnection.hpp | 2 +- tests/unit/CMakeLists.txt | 2 + tests/unit/app/StopperTests.cpp | 116 +++++++++++ tests/unit/etl/LoadBalancerTests.cpp | 9 + tests/unit/etl/SourceImplTests.cpp | 10 +- tests/unit/etl/SubscriptionSourceTests.cpp | 191 +++++++++--------- tests/unit/util/StopHelperTests.cpp | 62 ++++++ tests/unit/web/ng/ServerTests.cpp | 50 ++++- .../web/ng/impl/ConnectionHandlerTests.cpp | 78 ++++++- tests/unit/web/ng/impl/WsConnectionTests.cpp | 26 +++ 41 files changed, 1072 insertions(+), 190 deletions(-) create mode 100644 src/app/Stopper.cpp create mode 100644 src/app/Stopper.hpp create mode 100644 src/util/StopHelper.cpp create mode 100644 src/util/StopHelper.hpp create mode 100644 tests/unit/app/StopperTests.cpp create mode 100644 tests/unit/util/StopHelperTests.cpp diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index 0559fa49c..40cbbbc14 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -1,4 +1,4 @@ add_library(clio_app) -target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp) +target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp) target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration) diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 393c7b386..4056306f4 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -19,6 +19,7 @@ #include "app/ClioApplication.hpp" +#include "app/Stopper.hpp" #include "app/WebHandlers.hpp" #include "data/AmendmentCenter.hpp" #include "data/BackendFactory.hpp" @@ -45,6 +46,8 @@ #include "web/ng/Server.hpp" #include +#include +#include #include #include @@ -84,6 +87,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi { LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString(); PrometheusService::init(config); + signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); }); } int @@ -169,6 +173,10 @@ ClioApplication::run(bool const useNgWebServer) return EXIT_FAILURE; } + appStopper_.setOnStop( + Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc) + ); + // Blocks until stopped. // When stopped, shared_ptrs fall out of scope // Calls destructors on all resources, and destructs in order diff --git a/src/app/ClioApplication.hpp b/src/app/ClioApplication.hpp index 65b8f5f81..1eac02aae 100644 --- a/src/app/ClioApplication.hpp +++ b/src/app/ClioApplication.hpp @@ -19,6 +19,7 @@ #pragma once +#include "app/Stopper.hpp" #include "util/SignalsHandler.hpp" #include "util/newconfig/ConfigDefinition.hpp" @@ -30,6 +31,7 @@ namespace app { class ClioApplication { util::config::ClioConfigDefinition const& config_; util::SignalsHandler signalsHandler_; + Stopper appStopper_; public: /** diff --git a/src/app/Stopper.cpp b/src/app/Stopper.cpp new file mode 100644 index 000000000..90da5cde3 --- /dev/null +++ b/src/app/Stopper.cpp @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "app/Stopper.hpp" + +#include + +#include +#include +#include + +namespace app { + +Stopper::~Stopper() +{ + if (worker_.joinable()) + worker_.join(); +} + +void +Stopper::setOnStop(std::function cb) +{ + boost::asio::spawn(ctx_, std::move(cb)); +} + +void +Stopper::stop() +{ + // Do nothing if worker_ is already running + if (worker_.joinable()) + return; + + worker_ = std::thread{[this]() { ctx_.run(); }}; +} + +} // namespace app diff --git a/src/app/Stopper.hpp b/src/app/Stopper.hpp new file mode 100644 index 000000000..b4faa1377 --- /dev/null +++ b/src/app/Stopper.hpp @@ -0,0 +1,118 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/ETLService.hpp" +#include "etl/LoadBalancer.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/CoroutineGroup.hpp" +#include "util/log/Logger.hpp" +#include "web/ng/Server.hpp" + +#include +#include +#include + +#include +#include + +namespace app { + +/** + * @brief Application stopper class. On stop it will create a new thread to run all the shutdown tasks. + */ +class Stopper { + boost::asio::io_context ctx_; + std::thread worker_; + +public: + /** + * @brief Destroy the Stopper object + */ + ~Stopper(); + + /** + * @brief Set the callback to be called when the application is stopped. + * + * @param cb The callback to be called on application stop. + */ + void + setOnStop(std::function cb); + + /** + * @brief Stop the application and run the shutdown tasks. + */ + void + stop(); + + /** + * @brief Create a callback to be called on application stop. + * + * @param server The server to stop. + * @param balancer The load balancer to stop. + * @param etl The ETL service to stop. + * @param subscriptions The subscription manager to stop. + * @param backend The backend to stop. + * @param ioc The io_context to stop. + * @return The callback to be called on application stop. + */ + template < + web::ng::SomeServer ServerType, + etl::SomeLoadBalancer LoadBalancerType, + etl::SomeETLService ETLServiceType> + static std::function + makeOnStopCallback( + ServerType& server, + LoadBalancerType& balancer, + ETLServiceType& etl, + feed::SubscriptionManagerInterface& subscriptions, + data::BackendInterface& backend, + boost::asio::io_context& ioc + ) + { + return [&](boost::asio::yield_context yield) { + util::CoroutineGroup coroutineGroup{yield}; + coroutineGroup.spawn(yield, [&server](auto innerYield) { + server.stop(innerYield); + LOG(util::LogService::info()) << "Server stopped"; + }); + coroutineGroup.spawn(yield, [&balancer](auto innerYield) { + balancer.stop(innerYield); + LOG(util::LogService::info()) << "LoadBalancer stopped"; + }); + coroutineGroup.asyncWait(yield); + + etl.stop(); + LOG(util::LogService::info()) << "ETL stopped"; + + subscriptions.stop(); + LOG(util::LogService::info()) << "SubscriptionManager stopped"; + + backend.waitForWritesToFinish(); + LOG(util::LogService::info()) << "Backend writes finished"; + + ioc.stop(); + LOG(util::LogService::info()) << "io_context stopped"; + }; + } +}; + +} // namespace app diff --git a/src/data/BackendInterface.hpp b/src/data/BackendInterface.hpp index 2d7e3ef61..48771bae1 100644 --- a/src/data/BackendInterface.hpp +++ b/src/data/BackendInterface.hpp @@ -683,6 +683,12 @@ class BackendInterface { bool finishWrites(std::uint32_t ledgerSequence); + /** + * @brief Wait for all pending writes to finish. + */ + virtual void + waitForWritesToFinish() = 0; + /** * @brief Mark the migration status of a migrator as Migrated in the database * diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index ba8aacd7f..71e9ecf59 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -188,11 +188,16 @@ class BasicCassandraBackend : public BackendInterface { return {txns, {}}; } + void + waitForWritesToFinish() override + { + executor_.sync(); + } + bool doFinishWrites() override { - // wait for other threads to finish their writes - executor_.sync(); + waitForWritesToFinish(); if (!range_) { executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_); diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 1dbbd94c6..f0e2a2fc3 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -42,6 +42,7 @@ #include #include +#include #include #include #include @@ -58,6 +59,16 @@ struct NFTsData; */ namespace etl { +/** + * @brief A tag class to help identify ETLService in templated code. + */ +struct ETLServiceTag { + virtual ~ETLServiceTag() = default; +}; + +template +concept SomeETLService = std::derived_from; + /** * @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the * databases. @@ -71,7 +82,7 @@ namespace etl { * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring * to writing and from writing to monitoring, based on the activity of other processes running on different machines. */ -class ETLService { +class ETLService : public ETLServiceTag { // TODO: make these template parameters in ETLService using LoadBalancerType = LoadBalancer; using DataPipeType = etl::impl::ExtractionDataPipe; @@ -159,10 +170,20 @@ class ETLService { /** * @brief Stops components and joins worker thread. */ - ~ETLService() + ~ETLService() override + { + if (not state_.isStopping) + stop(); + } + + /** + * @brief Stop the ETL service. + * @note This method blocks until the ETL service has stopped. + */ + void + stop() { - LOG(log_.info()) << "onStop called"; - LOG(log_.debug()) << "Stopping Reporting ETL"; + LOG(log_.info()) << "Stop called"; state_.isStopping = true; cacheLoader_.stop(); diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 6e961933c..7dc0d1925 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -26,6 +26,7 @@ #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" #include "util/Assert.hpp" +#include "util/CoroutineGroup.hpp" #include "util/Random.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" @@ -336,6 +337,16 @@ LoadBalancer::getETLState() noexcept return etlState_; } +void +LoadBalancer::stop(boost::asio::yield_context yield) +{ + util::CoroutineGroup group{yield}; + std::ranges::for_each(sources_, [&group, yield](auto& source) { + group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); }); + }); + group.asyncWait(yield); +} + void LoadBalancer::chooseForwardingSource() { diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index 60f56a487..cfe01d019 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -51,6 +52,16 @@ namespace etl { +/** + * @brief A tag class to help identify LoadBalancer in templated code. + */ +struct LoadBalancerTag { + virtual ~LoadBalancerTag() = default; +}; + +template +concept SomeLoadBalancer = std::derived_from; + /** * @brief This class is used to manage connections to transaction processing processes. * @@ -58,7 +69,7 @@ namespace etl { * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also * allows requests for ledger data to be load balanced across all possible ETL sources. */ -class LoadBalancer { +class LoadBalancer : public LoadBalancerTag { public: using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; @@ -132,7 +143,7 @@ class LoadBalancer { SourceFactory sourceFactory = makeSource ); - ~LoadBalancer(); + ~LoadBalancer() override; /** * @brief Load the initial ledger, writing data to the queue. @@ -203,6 +214,15 @@ class LoadBalancer { std::optional getETLState() noexcept; + /** + * @brief Stop the load balancer. This will stop all subscription sources. + * @note This function will asynchronously wait for all sources to stop. + * + * @param yield The coroutine context + */ + void + stop(boost::asio::yield_context yield); + private: /** * @brief Execute a function on a randomly selected source. diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index e50849720..91d9bf817 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -65,6 +65,15 @@ class SourceBase { virtual void run() = 0; + /** + * @brief Stop Source. + * @note This method will asynchronously wait for source to be stopped. + * + * @param yield The coroutine context. + */ + virtual void + stop(boost::asio::yield_context yield) = 0; + /** * @brief Check if source is connected * diff --git a/src/etl/impl/SourceImpl.hpp b/src/etl/impl/SourceImpl.hpp index 7302bfa4c..3f2f25a33 100644 --- a/src/etl/impl/SourceImpl.hpp +++ b/src/etl/impl/SourceImpl.hpp @@ -102,6 +102,12 @@ class SourceImpl : public SourceBase { subscriptionSource_->run(); } + void + stop(boost::asio::yield_context yield) final + { + subscriptionSource_->stop(yield); + } + /** * @brief Check if source is connected * diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index 1684cd23f..0dd081e23 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -49,7 +49,6 @@ #include #include #include -#include #include #include #include @@ -92,15 +91,6 @@ SubscriptionSource::SubscriptionSource( .setConnectionTimeout(wsTimeout_); } -SubscriptionSource::~SubscriptionSource() -{ - stop(); - retry_.cancel(); - - if (runFuture_.valid()) - runFuture_.wait(); -} - void SubscriptionSource::run() { @@ -157,59 +147,53 @@ SubscriptionSource::validatedRange() const } void -SubscriptionSource::stop() +SubscriptionSource::stop(boost::asio::yield_context yield) { stop_ = true; + stopHelper_.asyncWaitForStop(yield); } void SubscriptionSource::subscribe() { - runFuture_ = boost::asio::spawn( - strand_, - [this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) { - auto connection = wsConnectionBuilder_.connect(yield); - if (not connection) { - handleError(connection.error(), yield); - return; - } - + boost::asio::spawn(strand_, [this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) { + if (auto connection = wsConnectionBuilder_.connect(yield); connection) { wsConnection_ = std::move(connection).value(); + } else { + handleError(connection.error(), yield); + return; + } - auto const& subscribeCommand = getSubscribeCommandJson(); - auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_); - if (writeErrorOpt) { - handleError(writeErrorOpt.value(), yield); - return; - } + auto const& subscribeCommand = getSubscribeCommandJson(); - isConnected_ = true; - LOG(log_.info()) << "Connected"; - onConnect_(); + if (auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_); writeErrorOpt) { + handleError(writeErrorOpt.value(), yield); + return; + } - retry_.reset(); + isConnected_ = true; + LOG(log_.info()) << "Connected"; + onConnect_(); - while (!stop_) { - auto const message = wsConnection_->read(yield, wsTimeout_); - if (not message) { - handleError(message.error(), yield); - return; - } + retry_.reset(); - auto const handleErrorOpt = handleMessage(message.value()); - if (handleErrorOpt) { - handleError(handleErrorOpt.value(), yield); - return; - } + while (!stop_) { + auto const message = wsConnection_->read(yield, wsTimeout_); + if (not message) { + handleError(message.error(), yield); + return; } - // Close the connection - handleError( - util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted}, - yield - ); - }, - boost::asio::use_future - ); + + if (auto const handleErrorOpt = handleMessage(message.value()); handleErrorOpt) { + handleError(handleErrorOpt.value(), yield); + return; + } + } + // Close the connection + handleError( + util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted}, yield + ); + }); } std::optional @@ -299,6 +283,8 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost logError(error); if (not stop_) { retry_.retry([this] { subscribe(); }); + } else { + stopHelper_.readyToStop(); } } diff --git a/src/etl/impl/SubscriptionSource.hpp b/src/etl/impl/SubscriptionSource.hpp index 77bdf4b0e..d57cb76b6 100644 --- a/src/etl/impl/SubscriptionSource.hpp +++ b/src/etl/impl/SubscriptionSource.hpp @@ -24,6 +24,7 @@ #include "feed/SubscriptionManagerInterface.hpp" #include "util/Mutex.hpp" #include "util/Retry.hpp" +#include "util/StopHelper.hpp" #include "util/log/Logger.hpp" #include "util/prometheus/Gauge.hpp" #include "util/requests/Types.hpp" @@ -39,7 +40,6 @@ #include #include #include -#include #include #include #include @@ -50,6 +50,7 @@ namespace etl::impl { /** * @brief This class is used to subscribe to a source of ledger data and forward it to the subscription manager. + * @note This class is safe to delete only if io_context is stopped. */ class SubscriptionSource { public: @@ -89,7 +90,7 @@ class SubscriptionSource { std::reference_wrapper lastMessageTimeSecondsSinceEpoch_; - std::future runFuture_; + util::StopHelper stopHelper_; static constexpr std::chrono::seconds kWS_TIMEOUT{30}; static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30}; @@ -124,13 +125,6 @@ class SubscriptionSource { std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY ); - /** - * @brief Destroy the Subscription Source object - * - * @note This will block to wait for all the async operations to complete. io_context must be still running - */ - ~SubscriptionSource(); - /** * @brief Run the source */ @@ -192,7 +186,7 @@ class SubscriptionSource { * @brief Stop the source. The source will complete already scheduled operations but will not schedule new ones */ void - stop(); + stop(boost::asio::yield_context yield); private: void diff --git a/src/feed/SubscriptionManager.hpp b/src/feed/SubscriptionManager.hpp index 096b7fb2d..d839a672f 100644 --- a/src/feed/SubscriptionManager.hpp +++ b/src/feed/SubscriptionManager.hpp @@ -115,6 +115,15 @@ class SubscriptionManager : public SubscriptionManagerInterface { * @brief Destructor of the SubscriptionManager object. It will block until all running jobs finished. */ ~SubscriptionManager() override + { + stop(); + } + + /** + * @brief Stop the SubscriptionManager and wait for all jobs to finish. + */ + void + stop() override { ctx_.stop(); ctx_.join(); diff --git a/src/feed/SubscriptionManagerInterface.hpp b/src/feed/SubscriptionManagerInterface.hpp index c627bc53c..c0efd7f91 100644 --- a/src/feed/SubscriptionManagerInterface.hpp +++ b/src/feed/SubscriptionManagerInterface.hpp @@ -45,6 +45,12 @@ class SubscriptionManagerInterface { public: virtual ~SubscriptionManagerInterface() = default; + /** + * @brief Stop the SubscriptionManager and wait for all jobs to finish. + */ + virtual void + stop() = 0; + /** * @brief Subscribe to the book changes feed. * @param subscriber diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 8c28fdf95..32e792e80 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -22,6 +22,7 @@ target_sources( requests/impl/SslContext.cpp ResponseExpirationCache.cpp SignalsHandler.cpp + StopHelper.cpp Taggable.cpp TerminationHandler.cpp TimeUtils.cpp diff --git a/src/util/StopHelper.cpp b/src/util/StopHelper.cpp new file mode 100644 index 000000000..928773d09 --- /dev/null +++ b/src/util/StopHelper.cpp @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "util/StopHelper.hpp" + +#include +#include + +#include + +namespace util { + +void +StopHelper::readyToStop() +{ + onStopReady_(); + *stopped_ = true; +} + +void +StopHelper::asyncWaitForStop(boost::asio::yield_context yield) +{ + boost::asio::steady_timer timer{yield.get_executor(), std::chrono::steady_clock::duration::max()}; + onStopReady_.connect([&timer]() { timer.cancel(); }); + boost::system::error_code error; + if (!*stopped_) + timer.async_wait(yield[error]); +} + +} // namespace util diff --git a/src/util/StopHelper.hpp b/src/util/StopHelper.hpp new file mode 100644 index 000000000..226352b46 --- /dev/null +++ b/src/util/StopHelper.hpp @@ -0,0 +1,54 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include +#include +#include + +#include +#include + +namespace util { + +/** + * @brief Helper class to stop a class asynchronously. + */ +class StopHelper { + boost::signals2::signal onStopReady_; + std::unique_ptr stopped_ = std::make_unique(false); + +public: + /** + * @brief Notify that the class is ready to stop. + */ + void + readyToStop(); + + /** + * @brief Wait for the class to stop. + * + * @param yield The coroutine context + */ + void + asyncWaitForStop(boost::asio::yield_context yield); +}; + +} // namespace util diff --git a/src/web/ng/Server.cpp b/src/web/ng/Server.cpp index 11f78c176..70ba28ba3 100644 --- a/src/web/ng/Server.cpp +++ b/src/web/ng/Server.cpp @@ -27,6 +27,7 @@ #include "web/ng/Connection.hpp" #include "web/ng/MessageHandler.hpp" #include "web/ng/ProcessingPolicy.hpp" +#include "web/ng/Response.hpp" #include "web/ng/impl/HttpConnection.hpp" #include "web/ng/impl/ServerSslContext.hpp" @@ -42,6 +43,7 @@ #include #include #include +#include #include #include @@ -120,7 +122,7 @@ detectSsl(boost::asio::ip::tcp::socket socket, boost::asio::yield_context yield) return SslDetectionResult{.socket = tcpStream.release_socket(), .isSsl = isSsl, .buffer = std::move(buffer)}; } -std::expected> +std::expected> makeConnection( SslDetectionResult sslDetectionResult, std::optional& sslContext, @@ -133,7 +135,7 @@ makeConnection( impl::UpgradableConnectionPtr connection; if (sslDetectionResult.isSsl) { if (not sslContext.has_value()) - return std::unexpected{"SSL is not supported by this server"}; + return std::unexpected{"Error creating a connection: SSL is not supported by this server"}; connection = std::make_unique( std::move(sslDetectionResult.socket), @@ -157,7 +159,17 @@ makeConnection( connection->close(yield); return std::unexpected{std::nullopt}; } + return connection; +} +std::expected +tryUpgradeConnection( + impl::UpgradableConnectionPtr connection, + std::optional& sslContext, + util::TagDecoratorFactory& tagDecoratorFactory, + boost::asio::yield_context yield +) +{ auto const expectedIsUpgrade = connection->isUpgradeRequested(yield); if (not expectedIsUpgrade.has_value()) { return std::unexpected{ @@ -256,8 +268,9 @@ Server::run() } void -Server::stop() +Server::stop(boost::asio::yield_context yield) { + connectionHandler_.stop(yield); } void @@ -288,15 +301,32 @@ Server::handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield ); if (not connectionExpected.has_value()) { if (connectionExpected.error().has_value()) { - LOG(log_.info()) << "Error creating a connection: " << *connectionExpected.error(); + LOG(log_.info()) << *connectionExpected.error(); } return; } LOG(log_.trace()) << connectionExpected.value()->tag() << "Connection created"; + if (connectionHandler_.isStopping()) { + boost::asio::spawn( + ctx_.get(), + [connection = std::move(connectionExpected).value()](boost::asio::yield_context yield) { + web::ng::impl::ConnectionHandler::stopConnection(*connection, yield); + } + ); + return; + } + + auto connection = + tryUpgradeConnection(std::move(connectionExpected).value(), sslContext_, tagDecoratorFactory_, yield); + if (not connection.has_value()) { + LOG(log_.info()) << connection.error(); + return; + } + boost::asio::spawn( ctx_.get(), - [this, connection = std::move(connectionExpected).value()](boost::asio::yield_context yield) mutable { + [this, connection = std::move(connection).value()](boost::asio::yield_context yield) mutable { connectionHandler_.processConnection(std::move(connection), yield); } ); diff --git a/src/web/ng/Server.hpp b/src/web/ng/Server.hpp index 220ebd293..f719b8dd2 100644 --- a/src/web/ng/Server.hpp +++ b/src/web/ng/Server.hpp @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -40,10 +41,20 @@ namespace web::ng { +/** + * @brief A tag class for server to help identify Server in templated code. + */ +struct ServerTag { + virtual ~ServerTag() = default; +}; + +template +concept SomeServer = std::derived_from; + /** * @brief Web server class. */ -class Server { +class Server : public ServerTag { public: /** * @brief Check to perform for each new client connection. The check takes client ip as input and returns a Response @@ -147,11 +158,13 @@ class Server { run(); /** - * @brief Stop the server. - ** @note Stopping the server cause graceful shutdown of all connections. And rejecting new connections. + * @brief Stop the server. This method will asynchronously sleep unless all the users are disconnected. + * @note Stopping the server cause graceful shutdown of all connections. And rejecting new connections. + * + * @param yield The coroutine context. */ void - stop(); + stop(boost::asio::yield_context yield); private: void diff --git a/src/web/ng/impl/ConnectionHandler.cpp b/src/web/ng/impl/ConnectionHandler.cpp index 2a672ba3f..27e8bc655 100644 --- a/src/web/ng/impl/ConnectionHandler.cpp +++ b/src/web/ng/impl/ConnectionHandler.cpp @@ -35,10 +35,13 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -138,8 +141,23 @@ ConnectionHandler::onWs(MessageHandler handler) void ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::yield_context yield) { + LOG(log_.trace()) << connectionPtr->tag() << "New connection"; auto& connectionRef = *connectionPtr; - auto signalConnection = onStop_.connect([&connectionRef, yield]() { connectionRef.close(yield); }); + + if (isStopping()) { + stopConnection(connectionRef, yield); + return; + } + ++connectionsCounter_.get(); + + // Using coroutine group here to wait for stopConnection() to finish before exiting this function and destroying + // connection. + util::CoroutineGroup stopTask{yield, 1}; + auto stopSignalConnection = onStop_.connect([&connectionRef, &stopTask, yield]() { + stopTask.spawn(yield, [&connectionRef](boost::asio::yield_context innerYield) { + stopConnection(connectionRef, innerYield); + }); + }); bool shouldCloseGracefully = false; @@ -173,21 +191,57 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y } if (shouldCloseGracefully) { + connectionRef.setTimeout(kCLOSE_CONNECTION_TIMEOUT); connectionRef.close(yield); LOG(log_.trace()) << connectionRef.tag() << "Closed gracefully"; } - signalConnection.disconnect(); + stopSignalConnection.disconnect(); LOG(log_.trace()) << connectionRef.tag() << "Signal disconnected"; onDisconnectHook_(connectionRef); LOG(log_.trace()) << connectionRef.tag() << "Processing finished"; + + // Wait for a stopConnection() to finish if there is any to not have dangling reference in stopConnection(). + stopTask.asyncWait(yield); + + --connectionsCounter_.get(); + if (connectionsCounter_.get().value() == 0 && stopping_) + stopHelper_.readyToStop(); +} + +void +ConnectionHandler::stopConnection(Connection& connection, boost::asio::yield_context yield) +{ + util::Logger log{"WebServer"}; + LOG(log.trace()) << connection.tag() << "Stopping connection"; + Response response{ + boost::beast::http::status::service_unavailable, + "This Clio node is shutting down. Please try another node.", + connection + }; + connection.send(std::move(response), yield); + connection.setTimeout(kCLOSE_CONNECTION_TIMEOUT); + connection.close(yield); + LOG(log.trace()) << connection.tag() << "Connection closed"; } void -ConnectionHandler::stop() +ConnectionHandler::stop(boost::asio::yield_context yield) { + *stopping_ = true; onStop_(); + if (connectionsCounter_.get().value() == 0) + return; + + // Wait for server to disconnect all the users + stopHelper_.asyncWaitForStop(yield); +} + +bool +ConnectionHandler::isStopping() const +{ + return *stopping_; } bool @@ -211,7 +265,7 @@ ConnectionHandler::handleError(Error const& error, Connection const& connection) // Therefore, if we see a short read here, it has occurred // after the message has been completed, so it is safe to ignore it. if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated || - error == boost::asio::error::eof) + error == boost::asio::error::eof || error == boost::beast::error::timeout) return false; // WebSocket connection was gracefully closed @@ -308,7 +362,10 @@ ConnectionHandler::parallelRequestResponseLoop( ); } } + LOG(log_.trace()) << connection.tag() + << "Waiting processing tasks to finish. Number of tasks: " << tasksGroup.size(); tasksGroup.asyncWait(yield); + LOG(log_.trace()) << connection.tag() << "Processing is done"; return closeConnectionGracefully; } diff --git a/src/web/ng/impl/ConnectionHandler.hpp b/src/web/ng/impl/ConnectionHandler.hpp index 0572539a7..b3f17416a 100644 --- a/src/web/ng/impl/ConnectionHandler.hpp +++ b/src/web/ng/impl/ConnectionHandler.hpp @@ -19,8 +19,12 @@ #pragma once +#include "util/StopHelper.hpp" #include "util/Taggable.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Gauge.hpp" +#include "util/prometheus/Label.hpp" +#include "util/prometheus/Prometheus.hpp" #include "web/SubscriptionContextInterface.hpp" #include "web/ng/Connection.hpp" #include "web/ng/Error.hpp" @@ -33,8 +37,11 @@ #include #include +#include +#include #include #include +#include #include #include #include @@ -77,6 +84,12 @@ class ConnectionHandler { std::optional wsHandler_; boost::signals2::signal onStop_; + std::unique_ptr stopping_ = std::make_unique(false); + + std::reference_wrapper connectionsCounter_ = + PrometheusService::gaugeInt("connections_total_number", util::prometheus::Labels{{{"status", "connected"}}}); + + util::StopHelper stopHelper_; public: ConnectionHandler( @@ -87,6 +100,8 @@ class ConnectionHandler { OnDisconnectHook onDisconnectHook ); + static constexpr std::chrono::milliseconds kCLOSE_CONNECTION_TIMEOUT{500}; + void onGet(std::string const& target, MessageHandler handler); @@ -99,8 +114,14 @@ class ConnectionHandler { void processConnection(ConnectionPtr connection, boost::asio::yield_context yield); + static void + stopConnection(Connection& connection, boost::asio::yield_context yield); + void - stop(); + stop(boost::asio::yield_context yield); + + bool + isStopping() const; private: /** diff --git a/src/web/ng/impl/HttpConnection.hpp b/src/web/ng/impl/HttpConnection.hpp index 6773ed200..b7bdfae6a 100644 --- a/src/web/ng/impl/HttpConnection.hpp +++ b/src/web/ng/impl/HttpConnection.hpp @@ -77,6 +77,7 @@ class HttpConnection : public UpgradableConnection { StreamType stream_; std::optional> request_; std::chrono::steady_clock::duration timeout_{kDEFAULT_TIMEOUT}; + bool closed_{false}; public: HttpConnection( @@ -152,6 +153,13 @@ class HttpConnection : public UpgradableConnection { void close(boost::asio::yield_context yield) override { + // This is needed because calling async_shutdown() multiple times may lead to hanging coroutines. + // See WsConnection for more details. + if (closed_) + return; + + closed_ = true; + [[maybe_unused]] boost::system::error_code error; if constexpr (IsSslTcpStream) { boost::beast::get_lowest_layer(stream_).expires_after(timeout_); diff --git a/src/web/ng/impl/WsConnection.hpp b/src/web/ng/impl/WsConnection.hpp index b921c037d..e13e51b66 100644 --- a/src/web/ng/impl/WsConnection.hpp +++ b/src/web/ng/impl/WsConnection.hpp @@ -64,6 +64,7 @@ template class WsConnection : public WsConnectionBase { boost::beast::websocket::stream stream_; boost::beast::http::request initialRequest_; + bool closed_{false}; public: WsConnection( @@ -159,6 +160,13 @@ class WsConnection : public WsConnectionBase { void close(boost::asio::yield_context yield) override { + if (closed_) + return; + + // This should be set before the async_close(). Otherwise there is a possibility to have multiple coroutines + // waiting on async_close(), but only one will be woken up after the actual close happened, others will hang. + closed_ = true; + boost::system::error_code error; // unused stream_.async_close(boost::beast::websocket::close_code::normal, yield[error]); } diff --git a/tests/common/util/AsioContextTestFixture.hpp b/tests/common/util/AsioContextTestFixture.hpp index 715a53ddf..274491009 100644 --- a/tests/common/util/AsioContextTestFixture.hpp +++ b/tests/common/util/AsioContextTestFixture.hpp @@ -21,10 +21,14 @@ #include "util/LoggerFixtures.hpp" +#include #include #include +#include #include +#include #include +#include #include #include @@ -94,6 +98,38 @@ struct SyncAsioContextTest : virtual public NoLoggerFixture { runContext(); } + template + void + runSpawnWithTimeout(std::chrono::steady_clock::duration timeout, F&& f, bool allowMockLeak = false) + { + using namespace boost::asio; + + boost::asio::io_context timerCtx; + steady_timer timer{timerCtx, timeout}; + spawn(timerCtx, [this, &timer](yield_context yield) { + boost::system::error_code errorCode; + timer.async_wait(yield[errorCode]); + ctx_.stop(); + EXPECT_TRUE(false) << "Test timed out"; + }); + std::thread timerThread{[&timerCtx]() { timerCtx.run(); }}; + + testing::MockFunction call; + if (allowMockLeak) + testing::Mock::AllowLeak(&call); + + spawn(ctx_, [&](yield_context yield) { + f(yield); + call.Call(); + }); + + EXPECT_CALL(call, Call()); + runContext(); + + timerCtx.stop(); + timerThread.join(); + } + void runContext() { @@ -108,6 +144,15 @@ struct SyncAsioContextTest : virtual public NoLoggerFixture { ctx_.reset(); } + template + static void + runSyncOperation(F&& f) + { + boost::asio::io_service ioc; + boost::asio::spawn(ioc, f); + ioc.run(); + } + protected: boost::asio::io_context ctx_; }; diff --git a/tests/common/util/MockBackend.hpp b/tests/common/util/MockBackend.hpp index 0e11251af..bc8a1f434 100644 --- a/tests/common/util/MockBackend.hpp +++ b/tests/common/util/MockBackend.hpp @@ -211,6 +211,8 @@ struct MockBackend : public BackendInterface { MOCK_METHOD(void, doWriteLedgerObject, (std::string&&, std::uint32_t const, std::string&&), (override)); + MOCK_METHOD(void, waitForWritesToFinish, (), (override)); + MOCK_METHOD(bool, doFinishWrites, (), (override)); MOCK_METHOD(void, writeMPTHolders, (std::vector const&), (override)); diff --git a/tests/common/util/MockSource.hpp b/tests/common/util/MockSource.hpp index 04adef923..58984d485 100644 --- a/tests/common/util/MockSource.hpp +++ b/tests/common/util/MockSource.hpp @@ -23,7 +23,6 @@ #include "etl/Source.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" -#include "util/newconfig/ConfigDefinition.hpp" #include "util/newconfig/ObjectView.hpp" #include @@ -49,6 +48,7 @@ struct MockSource : etl::SourceBase { MOCK_METHOD(void, run, (), (override)); + MOCK_METHOD(void, stop, (boost::asio::yield_context), (override)); MOCK_METHOD(bool, isConnected, (), (const, override)); MOCK_METHOD(void, setForwarding, (bool), (override)); MOCK_METHOD(boost::json::object, toJson, (), (const, override)); @@ -89,6 +89,12 @@ class MockSourceWrapper : public etl::SourceBase { mock_->run(); } + void + stop(boost::asio::yield_context yield) override + { + mock_->stop(yield); + } + bool isConnected() const override { diff --git a/tests/common/util/MockSubscriptionManager.hpp b/tests/common/util/MockSubscriptionManager.hpp index 186fbdba5..f595e02f7 100644 --- a/tests/common/util/MockSubscriptionManager.hpp +++ b/tests/common/util/MockSubscriptionManager.hpp @@ -102,6 +102,8 @@ struct MockSubscriptionManager : feed::SubscriptionManagerInterface { MOCK_METHOD(void, unsubProposedTransactions, (feed::SubscriberSharedPtr const&), (override)); MOCK_METHOD(boost::json::object, report, (), (const, override)); + + MOCK_METHOD(void, stop, (), (override)); }; template