diff --git a/lib/ZeroMQChannel.cpp b/lib/ZeroMQChannel.cpp index bb38091f4..f037610a9 100644 --- a/lib/ZeroMQChannel.cpp +++ b/lib/ZeroMQChannel.cpp @@ -13,6 +13,7 @@ using namespace sairedis; #define ZMQ_RESPONSE_BUFFER_SIZE (4*1024*1024) +#define ZMQ_MAX_RETRY 10 ZeroMQChannel::ZeroMQChannel( _In_ const std::string& endpoint, @@ -219,14 +220,22 @@ void ZeroMQChannel::set( SWSS_LOG_DEBUG("sending: %s", msg.c_str()); - int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); - - if (rc <= 0) + for (int i = 0; true ; ++i) { - SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s", - m_endpoint.c_str(), - zmq_errno(), - zmq_strerror(zmq_errno())); + int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); + + if (rc <= 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY) + { + continue; + } + if (rc <= 0) + { + SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s", + m_endpoint.c_str(), + zmq_errno(), + zmq_strerror(zmq_errno())); + } + break; } } @@ -238,23 +247,7 @@ void ZeroMQChannel::del( std::vector values; - swss::FieldValueTuple opdata(key, command); - - values.insert(values.begin(), opdata); - - std::string msg = swss::JSon::buildJson(values); - - SWSS_LOG_DEBUG("sending: %s", msg.c_str()); - - int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); - - if (rc <= 0) - { - SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s", - m_endpoint.c_str(), - zmq_errno(), - zmq_strerror(zmq_errno())); - } + set(key, values, command); } sai_status_t ZeroMQChannel::wait( @@ -270,35 +263,51 @@ sai_status_t ZeroMQChannel::wait( items[0].socket = m_socket; items[0].events = ZMQ_POLLIN; - int rc = zmq_poll(items, 1, (int)m_responseTimeoutMs); + int rc; - if (rc == 0) + for (int i = 0; true ; ++i) { - SWSS_LOG_ERROR("zmq_poll timed out for: %s", command.c_str()); + rc = zmq_poll(items, 1, (int)m_responseTimeoutMs); - // notice, at this point we could throw, since in REP/REQ pattern - // we are forced to use send/recv in that specific order + if (rc == 0) + { + SWSS_LOG_ERROR("zmq_poll timed out for: %s", command.c_str()); - return SAI_STATUS_FAILURE; - } + // notice, at this point we could throw, since in REP/REQ pattern + // we are forced to use send/recv in that specific order - if (rc < 0) - { - SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno()); + return SAI_STATUS_FAILURE; + } + if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY) + { + continue; + } + if (rc < 0) + { + SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno()); + } + break; } - rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); - - if (rc < 0) + for (int i = 0; true ; ++i) { - SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); - } + rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); - if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) - { - SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - ZMQ_RESPONSE_BUFFER_SIZE, - rc); + if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY) + { + continue; + } + if (rc < 0) + { + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + } + if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + { + SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", + ZMQ_RESPONSE_BUFFER_SIZE, + rc); + } + break; } m_buffer.at(rc) = 0; // make sure that we end string with zero before parse diff --git a/tests/tests.cpp b/tests/tests.cpp index e11805484..7214bda70 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -11,6 +11,8 @@ #include "meta/sai_serialize.h" +#include +#include #include #include @@ -19,6 +21,18 @@ using namespace sairedis; #define ASSERT_EQ(a,b) if ((a) != (b)) { SWSS_LOG_THROW("ASSERT EQ FAILED: " #a " != " #b); } +#define ASSERT_THROW(a,b) \ + try { \ + a; \ + SWSS_LOG_ERROR("ASSERT_THROW FAILED"); \ + exit(1); \ + } \ + catch(const b &e) { \ + } \ + catch(...) { \ + SWSS_LOG_THROW("ASSERT_THROW FAILED"); \ + } + /* * Test if destructor proper clean and join zeromq socket and context, and * break recv method. @@ -86,13 +100,54 @@ static void test_zeromqchannel_first_notification() } } +void send_signals() +{ + SWSS_LOG_ENTER(); + pid_t pid = getpid(); + for (int i = 0; i < 11; ++i) + { + sleep(1); + kill(pid, SIGHUP); + } +}; + +/* + * Test if runtime_error will be thrown if zmq wait reaches max retry due to + * signal interrupt. + */ +static void test_zeromqchannel_eintr_errno_on_wait() +{ + SWSS_LOG_ENTER(); + + std::cout << " * " << __FUNCTION__ << std::endl; + + ZeroMQChannel z("ipc:///tmp/feeds1", "ipc:///tmp/feeds2", nullptr); + z.setResponseTimeout(60000); + + std::thread signal_thread(send_signals); + + swss::KeyOpFieldsValuesTuple kco; + ASSERT_THROW(z.wait("foo", kco), std::runtime_error); + + signal_thread.join(); +} + +void sighup_handler(int signo) +{ + SWSS_LOG_ENTER(); +} + int main() { SWSS_LOG_ENTER(); + signal(SIGHUP, sighup_handler); + test_zeromqchannel_destructor(); test_zeromqchannel_first_notification(); + test_zeromqchannel_eintr_errno_on_wait(); + return 0; }