diff --git a/include/ylt/coro_io/channel.hpp b/include/ylt/coro_io/channel.hpp index a7962c97c..e78f7da9d 100644 --- a/include/ylt/coro_io/channel.hpp +++ b/include/ylt/coro_io/channel.hpp @@ -166,11 +166,14 @@ class channel { config)) { std::shared_ptr client_pool; if (client_pools_.size() > 1) { - client_pool = co_await std::visit( - [this](auto& worker) { - return worker(*this); - }, - lb_worker); + int cnt = 0; + do { + client_pool = co_await std::visit( + [this](auto& worker) { + return worker(*this); + }, + lb_worker); + } while (!client_pool->is_alive() && ++cnt <= size() * 2); } else { client_pool = client_pools_[0]; diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index a67c9008c..f93bfd2c0 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -113,6 +113,21 @@ class client_pool : public std::enable_shared_from_this< return std::chrono::milliseconds{static_cast(e(r) * ms.count())}; } + static async_simple::coro::Lazy> + reconnect_impl(std::unique_ptr& client, + std::shared_ptr& self) { + auto pre_time_point = std::chrono::steady_clock::now(); + auto result = co_await client->connect(self->host_name_); + bool ok = client_t::is_ok(result); + auto post_time_point = std::chrono::steady_clock::now(); + auto cost_time = std::chrono::duration_cast( + post_time_point - pre_time_point); + ELOG_TRACE << "reconnect client{" << client.get() + << "} cost time: " << cost_time / std::chrono::milliseconds{1} + << "ms"; + co_return std::pair{ok, cost_time}; + } + static async_simple::coro::Lazy reconnect( std::unique_ptr& client, std::weak_ptr watcher) { using namespace std::chrono_literals; @@ -123,13 +138,7 @@ class client_pool : public std::enable_shared_from_this< << client->get_host() << ":" << client->get_port() << "}, try count:" << i + 1 << "max retry limit:" << self->pool_config_.connect_retry_count; - auto pre_time_point = std::chrono::steady_clock::now(); - bool ok = client_t::is_ok(co_await client->connect(self->host_name_)); - auto post_time_point = std::chrono::steady_clock::now(); - auto cost_time = post_time_point - pre_time_point; - ELOG_TRACE << "reconnect client{" << client.get() - << "} cost time: " << cost_time / std::chrono::milliseconds{1} - << "ms"; + auto [ok, cost_time] = co_await reconnect_impl(client, self); if (ok) { ELOG_TRACE << "reconnect client{" << client.get() << "} success"; co_return; @@ -148,9 +157,69 @@ class client_pool : public std::enable_shared_from_this< ELOG_WARN << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "} out of max limit, stop retry. connect failed"; + alive_detect(client->get_config(), std::move(self)).start([](auto&&) { + }); client = nullptr; } + static async_simple::coro::Lazy alive_detect( + const typename client_t::config& client_config, + std::weak_ptr watcher) { + std::shared_ptr self = watcher.lock(); + using namespace std::chrono_literals; + if (self && self->pool_config_.host_alive_detect_duration.count() != 0 && + self->free_client_count() == 0) { + bool expected = true; + if (!self->is_alive_.compare_exchange_strong( + expected, false)) { // other alive detect coroutine is running. + co_return; + } + if (self->free_client_count() > 0) { // recheck for multi-thread + self->is_alive_ = true; + co_return; + } + auto executor = self->io_context_pool_.get_executor(); + auto client = std::make_unique(*executor); + if (!client->init_config(client_config)) + AS_UNLIKELY { + ELOG_ERROR << "Init client config failed in host alive detect. That " + "is not expected."; + co_return; + } + while (true) { + auto [ok, cost_time] = co_await reconnect_impl(client, self); + if (ok) { + ELOG_TRACE << "reconnect client{" << client.get() + << "} success. stop alive detect."; + self->collect_free_client(std::move(client)); + self->is_alive_ = + true; /*if client close(), we still mark it as alive*/ + co_return; + } + if (self->is_alive_) { + ELOG_TRACE << "client pool is aliving, stop connect client {" + << client.get() << "} for alive detect"; + co_return; + } + ELOG_TRACE << "reconnect client{" << client.get() + << "} failed. continue alive detect."; + auto wait_time = rand_time( + (self->pool_config_.host_alive_detect_duration - cost_time) / 1ms * + 1ms); + self = nullptr; + if (wait_time.count() > 0) { + co_await coro_io::sleep_for(wait_time, &client->get_executor()); + } + self = watcher.lock(); + if (self->is_alive_) { + ELOG_TRACE << "client pool is aliving, stop connect client {" + << client.get() << "} for alive detect"; + co_return; + } + } + } + } + async_simple::coro::Lazy> get_client( const typename client_t::config& client_config) { std::unique_ptr client; @@ -208,6 +277,7 @@ class client_pool : public std::enable_shared_from_this< enqueue(short_connect_clients_, std::move(client), pool_config_.short_connect_idle_timeout); } + is_alive_ = true; } else { ELOG_TRACE << "client{" << client.get() @@ -245,6 +315,8 @@ class client_pool : public std::enable_shared_from_this< std::chrono::milliseconds reconnect_wait_time{1000}; std::chrono::milliseconds idle_timeout{30000}; std::chrono::milliseconds short_connect_idle_timeout{1000}; + std::chrono::milliseconds host_alive_detect_duration{ + 30000}; /* zero means wont detect */ typename client_t::config client_config; }; @@ -312,6 +384,12 @@ class client_pool : public std::enable_shared_from_this< std::size_t free_client_count() const noexcept { return free_clients_.size() + short_connect_clients_.size(); } + /** + * @brief if host may not useable now. + * + * @return bool + */ + bool is_alive() const noexcept { return is_alive_; } /** * @brief approx connection of client pools @@ -368,6 +446,7 @@ class client_pool : public std::enable_shared_from_this< std::string host_name_; pool_config pool_config_; io_context_pool_t& io_context_pool_; + std::atomic is_alive_ = true; }; template { : coro_http_client(executor->get_asio_executor()) {} bool init_config(const config &conf) { + config_ = conf; if (conf.conn_timeout_duration.has_value()) { set_conn_timeout(*conf.conn_timeout_duration); } @@ -207,6 +208,8 @@ class coro_http_client : public std::enable_shared_from_this { coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; } + const config &get_config() { return config_; } + #ifdef CINATRA_ENABLE_SSL bool init_ssl(int verify_mode, const std::string &base_path, const std::string &cert_file, const std::string &sni_hostname) { @@ -2432,6 +2435,7 @@ class coro_http_client : public std::enable_shared_from_this { std::string resp_chunk_str_; std::span out_buf_; bool should_reset_ = false; + config config_; #ifdef CINATRA_ENABLE_GZIP bool enable_ws_deflate_ = false; diff --git a/src/coro_io/tests/test_channel.cpp b/src/coro_io/tests/test_channel.cpp index eb688fcac..8561ea509 100644 --- a/src/coro_io/tests/test_channel.cpp +++ b/src/coro_io/tests/test_channel.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include "async_simple/coro/Lazy.h" +#include "ylt/coro_io/client_pool.hpp" #include "ylt/coro_rpc/impl/coro_rpc_client.hpp" #include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" @@ -184,4 +186,109 @@ TEST_CASE("test send_request config") { } server.stop(); }()); +} + +void hello() {} + +TEST_CASE("test server down") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server1(1, 58801); + server1.register_handler(); + auto res = server1.async_start(); + REQUIRE_MESSAGE(!res.hasResult(), "server start failed"); + coro_rpc::coro_rpc_server server2(1, 58802); + server2.register_handler(); + auto res2 = server2.async_start(); + REQUIRE_MESSAGE(!res2.hasResult(), "server start failed"); + auto hosts = + std::vector{"127.0.0.1:58801", "127.0.0.1:58802"}; + auto config = coro_io::client_pool::pool_config{ + .connect_retry_count = 0, + .reconnect_wait_time = std::chrono::milliseconds{0}, + .host_alive_detect_duration = std::chrono::milliseconds{500}}; + auto channel = + coro_io::channel::create(hosts, {config}); + + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[i % 2]); + co_return; + }); + CHECK(res.has_value()); + } + server1.stop(); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + co_await client.call(); + if (i > 0) + CHECK(host == hosts[1]); + co_return; + }); + if (i > 2) + CHECK(res.has_value()); + } + server2.stop(); + { + { + auto res = co_await channel.send_request( + [](coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + co_await client.call(); + co_return; + }); + res = co_await channel.send_request( + [](coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + co_await client.call(); + co_return; + }); + CHECK(!res.has_value()); + } + } + + coro_rpc::coro_rpc_server server3(1, 58801); + server3.register_handler(); + auto res3 = server3.async_start(); + REQUIRE_MESSAGE(!res3.hasResult(), "server start failed"); + co_await coro_io::sleep_for(std::chrono::seconds{1}); + { + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[0]); + co_return; + }); + CHECK(res.has_value()); + } + } + coro_rpc::coro_rpc_server server4(1, 58802); + server4.register_handler(); + auto res4 = server4.async_start(); + REQUIRE_MESSAGE(!res4.hasResult(), "server start failed"); + co_await coro_io::sleep_for(std::chrono::seconds{1}); + { + int counter = 0; + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts, &counter]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + if (host == hosts[1]) { + ++counter; + } + co_return; + }); + CHECK(res.has_value()); + } + CHECK(counter == 50); + } + }()); } \ No newline at end of file