Skip to content

Commit

Permalink
refactoring tx
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex0vSky committed Jun 3, 2024
1 parent 219ba3c commit f7e3e8c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 58 deletions.
18 changes: 9 additions & 9 deletions src/net/tx/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@

namespace net::tx {
Base::Base() :
m_ioContext( std::make_unique< boost::asio::io_context >( 1 ) )
m_ioContext( 1 )
, c_endpointClient{ boost::asio::ip::address_v4( c_host ), c_port }
, c_endpointServer{ tcp::v4( ), c_port }
, m_acceptor( m_ioContext, c_endpointServer )
{}

void Base::update(function_t client, function_t server) {
m_stop = false;
bool m_stop = false;
co_spawn_( [this, server]() mutable ->awaitable {
co_await server( (Exchanger *)this );
co_await server( reinterpret_cast< Exchanger * >( this ) );
} );
co_spawn_( [this, client]() mutable ->awaitable {
co_await client( (Exchanger *)this );
co_spawn_( [this, client, &m_stop]() mutable ->awaitable {
co_await client( reinterpret_cast< Exchanger * >( this ) );
m_stop = true;
} );
while ( true ) {
m_ioContext ->run_one_for( std::chrono::milliseconds{ 300 } );
m_ioContext.run_one_for( std::chrono::milliseconds{ 300 } );
if ( m_stop ) {
cancelAcceptor_( );
m_ioContext ->run( );
m_acceptor.cancel( );
m_ioContext.run( );
break;
}
}
Expand Down
46 changes: 19 additions & 27 deletions src/net/tx/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,38 @@ class Exchanger;
*/
class Base {
using function_t = std::function< boost::asio::awaitable< void > (Exchanger *) >;
bool m_stop = false;

public:
using awaitable = boost::asio::awaitable< void >;
using tcp = boost::asio::ip::tcp;

protected:
static constexpr u_short c_port = 55555;
static constexpr boost::asio::ip::address_v4::bytes_type c_host{ 127, 0, 0, 1 };
std::unique_ptr< boost::asio::io_context > m_ioContext;
const tcp::endpoint c_endpointClient, c_endpointServer;

// aliases
template <typename... Args>
static constexpr auto as_tuple(Args&&... args) {
return boost::asio::experimental::as_tuple( std::forward< Args >( args )... );
}
static constexpr auto c_detached = boost::asio::detached;

template <typename F>
inline BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( decltype( c_detached ),
typename detail::awaitable_signature<typename result_of<F()>::type>::type)
co_spawn_(F&& f) {
return co_spawn( m_ioContext ->get_executor( ), std::forward<F>(f), c_detached );
}
public:
using awaitable = boost::asio::awaitable< void >;
using tcp = boost::asio::ip::tcp;
/**
* Call client and server simultaneously
*/
void update(function_t client, function_t server);

protected:
/**
* Create network transmission
*/
Base();
virtual void cancelAcceptor_()
{}
virtual ~Base()
{}

public:
boost::asio::io_context m_ioContext;
const tcp::endpoint c_endpointClient, c_endpointServer{ tcp::v4( ), c_port };
tcp::acceptor m_acceptor;
static constexpr auto c_detached = boost::asio::detached;
static constexpr auto c_tuple = as_tuple( boost::asio::use_awaitable_t{ } );
/**
* Call client and server simultaneously
*/
void update(function_t client, function_t server);

template <typename F>
inline BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( decltype( c_detached ),
typename detail::awaitable_signature<typename result_of<F()>::type>::type)
co_spawn_(F&& f) {
return co_spawn( m_ioContext.get_executor( ), std::forward<F>(f), c_detached );
}
};
} // namespace net::tx
6 changes: 3 additions & 3 deletions src/net/tx/exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ boost::asio::awaitable<bool> Exchanger::clientSide(Commander::Command command, u
boost::asio::awaitable<void> Exchanger::Holder::finish() {
//printf( "[Exchanger::serverSide] acceptor...\n" ); //
while ( true ) {
auto [error1, socket] = co_await m_acceptor.async_accept( c_tuple );
if ( boost::asio::error::operation_aborted == error1 ) // due `m_acceptor.cancel( )`
auto [error1, socket] = co_await m_acceptorPtr ->async_accept( c_tuple );
if ( boost::asio::error::operation_aborted == error1 ) // due `m_acceptorPtr ->cancel( )`
break;
if ( !socket.is_open( ) || error1 )
continue;
auto it = m_commandsBuffer.find( co_await readCommand_( socket ) );
if ( it == m_commandsBuffer.end( ) )
continue;
cista::byte_buf buffer = it ->second;
auto [error2, nwritten] = co_await boost::asio::async_write( socket, boost::asio::buffer( buffer ), c_tuple );
auto [error2, nwritten] = co_await boost::asio::async_write( socket, boost::asio::buffer( buffer ), Base::c_tuple );
if ( buffer.size( ) != nwritten )
continue;
//printf( "[Exchanger::serverSide] sent: %d\n", nwritten ); //
Expand Down
27 changes: 8 additions & 19 deletions src/net/tx/exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,21 @@ class Exchanger : public Commander {
using unit_t = cista::byte_buf;
class Holder {
std::unordered_map< Commander::Command, unit_t > m_commandsBuffer;
boost::asio::io_context *m_context;
tcp::acceptor m_acceptor;
tcp::acceptor *m_acceptorPtr;

public:
Holder(boost::asio::io_context *context, tcp::endpoint endpoint) :
m_context( context ), m_acceptor( *context, endpoint )
{}
Exchanger::Holder *Exchanger::Holder::on(Commander::Command command, unit_t const& buffer) {
explicit Holder(tcp::acceptor *acceptor) : m_acceptorPtr( acceptor ) {}
[[nodiscard]] Exchanger::Holder *Exchanger::Holder::on(Commander::Command command, unit_t const& buffer) {
return m_commandsBuffer.emplace( command, buffer ), this;
}
void cancel() {
m_acceptor.cancel( );
}
boost::asio::awaitable<void> finish();
[[nodiscard]] boost::asio::awaitable<void> finish();
};
using holder_t = std::shared_ptr< Holder >;
holder_t m_holder;
void cancelAcceptor_() override {
if ( m_holder ) m_holder ->cancel( ), m_holder.reset( );
}

public:
boost::asio::awaitable<bool> clientSide(Commander::Command, unit_t*);

Exchanger::holder_t Exchanger::serverSide() {
return m_holder = std::make_shared< Holder >( Base::m_ioContext.get( ), c_endpointServer );
[[nodiscard]] boost::asio::awaitable<bool> clientSide(Commander::Command, unit_t*);
auto Exchanger::serverSide() {
auto p = &m_acceptor;
return std::make_shared< Holder >( p );
}
};
} // namespace net::tx

0 comments on commit f7e3e8c

Please sign in to comment.