diff --git a/.gitignore b/.gitignore index 9d5a668d4b..a23d5b9f78 100644 --- a/.gitignore +++ b/.gitignore @@ -139,6 +139,7 @@ test_base85 test_bind_after_connect_tcp test_sodium test_zmq_poll_fd +test_reconnect_ivl tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/Makefile.am b/Makefile.am index 54c37dbac7..f5a7bf8709 100644 --- a/Makefile.am +++ b/Makefile.am @@ -418,6 +418,7 @@ test_apps = \ tests/test_base85 \ tests/test_bind_after_connect_tcp \ tests/test_sodium \ + tests/test_reconnect_ivl \ tests/test_socket_null tests_test_ancillaries_SOURCES = tests/test_ancillaries.cpp @@ -638,6 +639,9 @@ tests_test_sodium_LDADD = src/libzmq.la tests_test_socket_null_SOURCES = tests/test_socket_null.cpp tests_test_socket_null_LDADD = src/libzmq.la +tests_test_reconnect_ivl_SOURCES = tests/test_reconnect_ivl.cpp +tests_test_reconnect_ivl_LDADD = src/libzmq.la + if HAVE_CURVE test_apps += \ diff --git a/src/command.hpp b/src/command.hpp index a7eaf07218..ddb46d4675 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -30,6 +30,7 @@ #ifndef __ZMQ_COMMAND_HPP_INCLUDED__ #define __ZMQ_COMMAND_HPP_INCLUDED__ +#include #include "stdint.hpp" namespace zmq @@ -69,6 +70,7 @@ namespace zmq term_req, term, term_ack, + term_endpoint, reap, reaped, inproc_connected, @@ -153,6 +155,12 @@ namespace zmq struct { } term_ack; + // Sent by session_base (I/O thread) to socket (application thread) + // to ask to disconnect the endpoint. + struct { + std::string *endpoint; + } term_endpoint; + // Transfers the ownership of the closed socket // to the reaper thread. struct { diff --git a/src/object.cpp b/src/object.cpp index 982bc95b66..1e07226fcd 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -134,6 +134,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_term_ack (); break; + case command_t::term_endpoint: + process_term_endpoint (cmd_.args.term_endpoint.endpoint); + break; + case command_t::reap: process_reap (cmd_.args.reap.socket); break; @@ -332,6 +336,16 @@ void zmq::object_t::send_term_ack (own_t *destination_) send_command (cmd); } +void zmq::object_t::send_term_endpoint (own_t *destination_, + std::string *endpoint_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::term_endpoint; + cmd.args.term_endpoint.endpoint = endpoint_; + send_command (cmd); +} + void zmq::object_t::send_reap (class socket_base_t *socket_) { command_t cmd; @@ -435,6 +449,11 @@ void zmq::object_t::process_term_ack () zmq_assert (false); } +void zmq::object_t::process_term_endpoint (std::string *) +{ + zmq_assert (false); +} + void zmq::object_t::process_reap (class socket_base_t *) { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 97755c8819..90e12dd6a4 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -107,6 +107,7 @@ namespace zmq zmq::own_t *object_); void send_term (zmq::own_t *destination_, int linger_); void send_term_ack (zmq::own_t *destination_); + void send_term_endpoint (own_t *destination_, std::string *endpoint_); void send_reap (zmq::socket_base_t *socket_); void send_reaped (); void send_done (); @@ -127,6 +128,7 @@ namespace zmq virtual void process_term_req (zmq::own_t *object_); virtual void process_term (int linger_); virtual void process_term_ack (); + virtual void process_term_endpoint (std::string *endpoint_); virtual void process_reap (zmq::socket_base_t *socket_); virtual void process_reaped (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 7316dfff4d..683c4d6778 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -536,6 +536,11 @@ void zmq::session_base_t::reconnect () // Reconnect. if (options.reconnect_ivl != -1) start_connecting (true); + else { + std::string *ep = new (std::string); + addr->to_string (*ep); + send_term_endpoint (socket, ep); + } // For subscriber sockets we hiccup the inbound pipe, which will cause // the socket object to resend all the subscriptions. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2e504952ee..876581c3c6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1425,6 +1425,12 @@ void zmq::socket_base_t::process_term (int linger_) own_t::process_term (linger_); } +void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_) +{ + term_endpoint (endpoint_->c_str()); + delete endpoint_; +} + void zmq::socket_base_t::update_pipe_options(int option_) { if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index b54af708bc..f5a26568e0 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -250,6 +250,7 @@ namespace zmq void process_stop (); void process_bind (zmq::pipe_t *pipe_); void process_term (int linger_); + void process_term_endpoint (std::string *endpoint_); void update_pipe_options(int option_); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index cbb7b959a8..7b9a54c7e8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -70,6 +70,7 @@ set(tests test_sodium test_monitor test_socket_null + test_reconnect_ivl ) if(ZMQ_HAVE_CURVE) list(APPEND tests diff --git a/tests/test_reconnect_ivl.cpp b/tests/test_reconnect_ivl.cpp new file mode 100644 index 0000000000..87b07d696d --- /dev/null +++ b/tests/test_reconnect_ivl.cpp @@ -0,0 +1,149 @@ +/* + Copyright (c) 2017 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + + +#ifndef ZMQ_HAVE_WINDOWS +void test_reconnect_ivl_ipc (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + int rc = zmq_bind (sb, "ipc:///tmp/test_reconnect_ivl"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + int interval = -1; + rc = zmq_setsockopt (sc, ZMQ_RECONNECT_IVL, &interval, sizeof (int)); + assert (rc == 0); + rc = zmq_connect (sc, "ipc:///tmp/test_reconnect_ivl"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_unbind (sb, "ipc:///tmp/test_reconnect_ivl"); + assert (rc == 0); + + expect_bounce_fail (sb, sc); + + rc = zmq_bind (sb, "ipc:///tmp/test_reconnect_ivl"); + assert (rc == 0); + + expect_bounce_fail (sb, sc); + + rc = zmq_connect (sc, "ipc:///tmp/test_reconnect_ivl"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} +#endif + +void test_reconnect_ivl_tcp (const char *address) +{ + size_t len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + void *ctx = zmq_ctx_new (); + assert (ctx); + + if (streq (address, "tcp://[::1]:*")) { + if (is_ipv6_available ()) { + zmq_ctx_set(ctx, ZMQ_IPV6, 1); + } else { + zmq_ctx_term (ctx); + return; + } + } + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + int rc = zmq_bind (sb, address); + assert (rc == 0); + rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + int interval = -1; + rc = zmq_setsockopt (sc, ZMQ_RECONNECT_IVL, &interval, sizeof (int)); + assert (rc == 0); + rc = zmq_connect (sc, my_endpoint); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_unbind (sb, my_endpoint); + assert (rc == 0); + + expect_bounce_fail (sb, sc); + + rc = zmq_bind (sb, my_endpoint); + assert (rc == 0); + + expect_bounce_fail (sb, sc); + + rc = zmq_connect (sc, my_endpoint); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment (); + +#ifndef ZMQ_HAVE_WINDOWS + test_reconnect_ivl_ipc (); +#endif + test_reconnect_ivl_tcp ("tcp://127.0.0.1:*"); + test_reconnect_ivl_tcp ("tcp://[::1]:*"); + + return 0 ; +}