From 066fdaa27aba6df54612ee79d71d40300ad7c4b8 Mon Sep 17 00:00:00 2001 From: Florian Reimold <11774314+FlorianReimold@users.noreply.github.com> Date: Thu, 24 Oct 2024 09:46:11 +0200 Subject: [PATCH] Started implementing proper-close for sockets. The main use-case (simply close the ftp server without having a running data transfer) should already lead to proper results. However, there will be some memory leaks. --- fineftp-server/src/ftp_session.cpp | 40 ++++++----- fineftp-server/src/ftp_session.h | 1 + fineftp-server/src/server_impl.cpp | 103 ++++++++++++++++++++--------- fineftp-server/src/server_impl.h | 10 ++- 4 files changed, 102 insertions(+), 52 deletions(-) diff --git a/fineftp-server/src/ftp_session.cpp b/fineftp-server/src/ftp_session.cpp index 3be83a9..68905c2 100755 --- a/fineftp-server/src/ftp_session.cpp +++ b/fineftp-server/src/ftp_session.cpp @@ -53,18 +53,38 @@ namespace fineftp } FtpSession::~FtpSession() + { + stop(); + completion_handler_(); + } + + void FtpSession::start() + { + asio::error_code ec; + command_socket_.set_option(asio::ip::tcp::no_delay(true), ec); + if (ec) std::cerr << "Unable to set socket option tcp::no_delay: " << ec.message() << std::endl; + + command_strand_.post([me = shared_from_this()]() { me->readFtpCommand(); }); + sendFtpMessage(FtpMessage(FtpReplyCode::SERVICE_READY_FOR_NEW_USER, "Welcome to fineFTP Server")); + } + + // TODO: I need to check if the command socket AND data socket is always shutdown and closed. + void FtpSession::stop() { #ifndef NDEBUG + // TODO: Have a "is stopped" variable, so that this log message is not printed every time std::cout << "Ftp Session shutting down" << std::endl; #endif // !NDEBUG + // TODO: protect the two sockets with mutexes, as it is now possible to call stop() from another thread!!! + { // Properly close command socket. // When the FtpSession is being destroyed, there are no std::shared_ptr's referring to // it and hence no possibility of race conditions on command_socket_. asio::error_code ec; - command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); - command_socket_.close(ec); + command_socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter + command_socket_.close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter } // When the FtpSession is being destroyed, there are no std::shared_ptr's referring to @@ -74,21 +94,9 @@ namespace fineftp { // Properly close data socket asio::error_code ec; - data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec); - data_socket->close(ec); + data_socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter + data_socket->close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter } - - completion_handler_(); - } - - void FtpSession::start() - { - asio::error_code ec; - command_socket_.set_option(asio::ip::tcp::no_delay(true), ec); - if (ec) std::cerr << "Unable to set socket option tcp::no_delay: " << ec.message() << std::endl; - - command_strand_.post([me = shared_from_this()]() { me->readFtpCommand(); }); - sendFtpMessage(FtpMessage(FtpReplyCode::SERVICE_READY_FOR_NEW_USER, "Welcome to fineFTP Server")); } asio::ip::tcp::socket& FtpSession::getSocket() diff --git a/fineftp-server/src/ftp_session.h b/fineftp-server/src/ftp_session.h index 5a08556..8d4b6ac 100755 --- a/fineftp-server/src/ftp_session.h +++ b/fineftp-server/src/ftp_session.h @@ -46,6 +46,7 @@ namespace fineftp ~FtpSession(); void start(); + void stop(); asio::ip::tcp::socket& getSocket(); diff --git a/fineftp-server/src/server_impl.cpp b/fineftp-server/src/server_impl.cpp index 0d8b151..b38515e 100644 --- a/fineftp-server/src/server_impl.cpp +++ b/fineftp-server/src/server_impl.cpp @@ -20,7 +20,6 @@ namespace fineftp : port_ (port) , address_ (address) , acceptor_ (io_service_) - , open_connection_count_(0) {} FtpServerImpl::~FtpServerImpl() @@ -40,8 +39,6 @@ namespace fineftp bool FtpServerImpl::start(size_t thread_count) { - auto ftp_session = std::make_shared(io_service_, ftp_users_, [this]() { open_connection_count_--; }); - // set up the acceptor to listen on the tcp port asio::error_code make_address_ec; const asio::ip::tcp::endpoint endpoint(asio::ip::make_address(address_, make_address_ec), port_); @@ -52,6 +49,8 @@ namespace fineftp } { + const std::lock_guard acceptor_lock(acceptor_mutex_); + asio::error_code ec; acceptor_.open(endpoint.protocol(), ec); if (ec) @@ -62,6 +61,8 @@ namespace fineftp } { + const std::lock_guard acceptor_lock(acceptor_mutex_); + asio::error_code ec; acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec); if (ec) @@ -72,6 +73,8 @@ namespace fineftp } { + const std::lock_guard acceptor_lock(acceptor_mutex_); + asio::error_code ec; acceptor_.bind(endpoint, ec); if (ec) @@ -82,6 +85,8 @@ namespace fineftp } { + const std::lock_guard acceptor_lock(acceptor_mutex_); + asio::error_code ec; acceptor_.listen(asio::socket_base::max_listen_connections, ec); if (ec) @@ -92,16 +97,13 @@ namespace fineftp } #ifndef NDEBUG - std::cout << "FTP Server created." << std::endl << "Listening at address " << acceptor_.local_endpoint().address() << " on port " << acceptor_.local_endpoint().port() << ":" << std::endl; + { + const std::lock_guard acceptor_lock(acceptor_mutex_); + std::cout << "FTP Server created." << std::endl << "Listening at address " << acceptor_.local_endpoint().address() << " on port " << acceptor_.local_endpoint().port() << ":" << std::endl; + } #endif // NDEBUG - acceptor_.async_accept(ftp_session->getSocket() - , [this, ftp_session](auto ec) - { - open_connection_count_++; - - acceptFtpSession(ftp_session, ec); - }); + waitForNextFtpSession(); for (size_t i = 0; i < thread_count; i++) { @@ -113,52 +115,87 @@ namespace fineftp void FtpServerImpl::stop() { - io_service_.stop(); - for (std::thread& thread : thread_pool_) + // Prevent new sessions from being created { - thread.join(); + const std::lock_guard acceptor_lock(acceptor_mutex_); + + // Close acceptor, if necessary + if (acceptor_.is_open()) + { + asio::error_code ec; + acceptor_.close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the return value rom the ec parameter + } + } + + // Stop all sessions + { + const std::lock_guard session_list_lock(session_list_mutex_); + for(const auto& session_weak : session_list_) + { + const auto session = session_weak.lock(); + if (session) + session->stop(); + } + } + + // Wait for the io_context to run out of work by joining all threads + { + for (std::thread& thread : thread_pool_) + { + thread.join(); + } + thread_pool_.clear(); } - thread_pool_.clear(); } - void FtpServerImpl::acceptFtpSession(const std::shared_ptr& ftp_session, asio::error_code const& error) + void FtpServerImpl::waitForNextFtpSession() { - if (error) + // TODO: create proper shutdown callback as lambda + + auto shutdown_callback = [this]() { }; + + auto new_ftp_session = std::make_shared(io_service_, ftp_users_, shutdown_callback); + { -#ifndef NDEBUG - std::cerr << "Error handling connection: " << error.message() << std::endl; -#endif - return; - } + const std::lock_guard acceptor_lock(acceptor_mutex_); + acceptor_.async_accept(new_ftp_session->getSocket() + , [this, new_ftp_session](asio::error_code ec) // TODO: replace this with weak ptr to this + { + if (ec) + { + std::cerr << "Error accepting connection: " << ec.message() << std::endl; + return; + } #ifndef NDEBUG - std::cout << "FTP Client connected: " << ftp_session->getSocket().remote_endpoint().address().to_string() << ":" << ftp_session->getSocket().remote_endpoint().port() << std::endl; + std::cout << "FTP Client connected: " << new_ftp_session->getSocket().remote_endpoint().address().to_string() << ":" << new_ftp_session->getSocket().remote_endpoint().port() << std::endl; #endif + const std::lock_guard session_list_lock(this->session_list_mutex_); + this->session_list_.push_back(new_ftp_session); - ftp_session->start(); - - auto new_session = std::make_shared(io_service_, ftp_users_, [this]() { open_connection_count_--; }); + new_ftp_session->start(); - acceptor_.async_accept(new_session->getSocket() - , [this, new_session](auto ec) - { - open_connection_count_++; - acceptFtpSession(new_session, ec); - }); + waitForNextFtpSession(); + }); + } } int FtpServerImpl::getOpenConnectionCount() { - return open_connection_count_; + const std::lock_guard session_list_lock(session_list_mutex_); + // TODO: 2024-10-23: Check if closed sessions can be in this list and if I need to iterate over this list to count the open ones, only + return session_list_.size(); } uint16_t FtpServerImpl::getPort() { + const std::lock_guard acceptor_lock(acceptor_mutex_); return acceptor_.local_endpoint().port(); } std::string FtpServerImpl::getAddress() { + const std::lock_guard acceptor_lock(acceptor_mutex_); return acceptor_.local_endpoint().address().to_string(); } } diff --git a/fineftp-server/src/server_impl.h b/fineftp-server/src/server_impl.h index 0a3a90d..ecf5616 100644 --- a/fineftp-server/src/server_impl.h +++ b/fineftp-server/src/server_impl.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,7 @@ namespace fineftp std::string getAddress(); private: - void acceptFtpSession(const std::shared_ptr& ftp_session, asio::error_code const& error); + void waitForNextFtpSession(); private: UserDatabase ftp_users_; @@ -56,8 +57,11 @@ namespace fineftp std::vector thread_pool_; asio::io_service io_service_; - asio::ip::tcp::acceptor acceptor_; - std::atomic open_connection_count_; + mutable std::mutex acceptor_mutex_; //!< Mutex protecting the acceptor. That is necessary, as the user may stop the server (and therefore close the acceptor) from another thread. + asio::ip::tcp::acceptor acceptor_; //!< The acceptor waiting for new sessions + + mutable std::mutex session_list_mutex_; //!< Mutex protecting the list of current sessions + std::vector> session_list_; //!< List of sessions. Only store weak_ptr, so the sessions can delete themselves. This list is used to stop sessions and count connections }; }