From afd0f10d6350b9753ad6d00a760fce150d90bbf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=9Fingen?= Date: Thu, 9 Nov 2017 16:02:18 +0100 Subject: [PATCH] lib: Address ZMQ lib TODOs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add write callback. Add error callback. Add frrzmq_check_events() function to check for edge triggered things that may have happened after a zmq_send() call or so. Update ZMQ tests. Signed-off-by: ßingen --- lib/frr_zmq.c | 288 +++++++++++++++++++++++++++++--------- lib/frr_zmq.h | 77 +++++++--- tests/lib/test_zmq.c | 151 +++++++++++++++++--- tests/lib/test_zmq.refout | 49 ++++--- 4 files changed, 447 insertions(+), 118 deletions(-) diff --git a/lib/frr_zmq.c b/lib/frr_zmq.c index 861f7a5f0c2e..d4df5130e7e3 100644 --- a/lib/frr_zmq.c +++ b/lib/frr_zmq.c @@ -47,46 +47,43 @@ void frrzmq_finish(void) } } -/* read callback integration */ -struct frrzmq_cb { - struct thread *thread; - void *zmqsock; - void *arg; - int fd; - - bool cancelled; - - void (*cb_msg)(void *arg, void *zmqsock); - void (*cb_part)(void *arg, void *zmqsock, - zmq_msg_t *msg, unsigned partnum); -}; - - static int frrzmq_read_msg(struct thread *t) { - struct frrzmq_cb *cb = THREAD_ARG(t); + struct frrzmq_cb **cbp = THREAD_ARG(t); + struct frrzmq_cb *cb; zmq_msg_t msg; unsigned partno; + unsigned char read = 0; int ret, more; size_t moresz; + if (!cbp) + return 1; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return 1; + while (1) { - zmq_pollitem_t polli = { - .socket = cb->zmqsock, - .events = ZMQ_POLLIN - }; + zmq_pollitem_t polli = {.socket = cb->zmqsock, + .events = ZMQ_POLLIN}; ret = zmq_poll(&polli, 1, 0); if (ret < 0) goto out_err; + if (!(polli.revents & ZMQ_POLLIN)) break; - if (cb->cb_msg) { - cb->cb_msg(cb->arg, cb->zmqsock); + if (cb->read.cb_msg) { + cb->read.cb_msg(cb->read.arg, cb->zmqsock); + read = 1; - if (cb->cancelled) { - XFREE(MTYPE_ZEROMQ_CB, cb); + if (cb->read.cancelled) { + frrzmq_check_events(cbp, &cb->write, + ZMQ_POLLOUT); + cb->read.thread = NULL; + if (cb->write.cancelled && !cb->write.thread) + XFREE(MTYPE_ZEROMQ_CB, cb); return 0; } continue; @@ -104,11 +101,17 @@ static int frrzmq_read_msg(struct thread *t) zmq_msg_close(&msg); goto out_err; } + read = 1; - cb->cb_part(cb->arg, cb->zmqsock, &msg, partno); - if (cb->cancelled) { + cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg, + partno); + if (cb->read.cancelled) { zmq_msg_close(&msg); - XFREE(MTYPE_ZEROMQ_CB, cb); + frrzmq_check_events(cbp, &cb->write, + ZMQ_POLLOUT); + cb->read.thread = NULL; + if (cb->write.cancelled && !cb->write.thread) + XFREE(MTYPE_ZEROMQ_CB, cb); return 0; } @@ -116,8 +119,8 @@ static int frrzmq_read_msg(struct thread *t) * message; don't use zmq_msg_more here */ moresz = sizeof(more); more = 0; - ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, - &more, &moresz); + ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more, + &moresz); if (ret < 0) { zmq_msg_close(&msg); goto out_err; @@ -128,64 +131,221 @@ static int frrzmq_read_msg(struct thread *t) zmq_msg_close(&msg); } - funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg, - cb, cb->fd, &cb->thread, t->funcname, t->schedfrom, - t->schedfrom_line); + if (read) + frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); + + funcname_thread_add_read_write( + THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd, + &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line); return 0; out_err: - zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno); - return 0; + zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno); + if (cb->read.cb_error) + cb->read.cb_error(cb->read.arg, cb->zmqsock); + return 1; } -struct frrzmq_cb *funcname_frrzmq_thread_add_read( - struct thread_master *master, - void (*msgfunc)(void *arg, void *zmqsock), - void (*partfunc)(void *arg, void *zmqsock, - zmq_msg_t *msg, unsigned partnum), - void *arg, void *zmqsock, debugargdef) +int funcname_frrzmq_thread_add_read(struct thread_master *master, + void (*msgfunc)(void *arg, void *zmqsock), + void (*partfunc)(void *arg, void *zmqsock, + zmq_msg_t *msg, + unsigned partnum), + void (*errfunc)(void *arg, void *zmqsock), + void *arg, void *zmqsock, + struct frrzmq_cb **cbp, debugargdef) { int fd, events; size_t len; struct frrzmq_cb *cb; + if (!cbp) + return -1; if (!(msgfunc || partfunc) || (msgfunc && partfunc)) - return NULL; + return -1; + len = sizeof(fd); + if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) + return -1; + len = sizeof(events); + if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) + return -1; + + if (*cbp) + cb = *cbp; + else { + cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); + cb->write.cancelled = 1; + if (!cb) + return -1; + *cbp = cb; + } + + cb->zmqsock = zmqsock; + cb->fd = fd; + cb->read.arg = arg; + cb->read.cb_msg = msgfunc; + cb->read.cb_part = partfunc; + cb->read.cb_error = errfunc; + cb->read.cancelled = 0; + + if (events & ZMQ_POLLIN) { + if (cb->read.thread) { + thread_cancel(cb->read.thread); + cb->read.thread = NULL; + } + funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd, + &cb->read.thread, funcname, schedfrom, + fromln); + } else + funcname_thread_add_read_write( + THREAD_READ, master, frrzmq_read_msg, cbp, fd, + &cb->read.thread, funcname, schedfrom, fromln); + return 0; +} + +static int frrzmq_write_msg(struct thread *t) +{ + struct frrzmq_cb **cbp = THREAD_ARG(t); + struct frrzmq_cb *cb; + unsigned char written = 0; + int ret; + + if (!cbp) + return 1; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return 1; + + while (1) { + zmq_pollitem_t polli = {.socket = cb->zmqsock, + .events = ZMQ_POLLOUT}; + ret = zmq_poll(&polli, 1, 0); + + if (ret < 0) + goto out_err; + + if (!(polli.revents & ZMQ_POLLOUT)) + break; + + if (cb->write.cb_msg) { + cb->write.cb_msg(cb->write.arg, cb->zmqsock); + written = 1; + + if (cb->write.cancelled) { + frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); + cb->write.thread = NULL; + if (cb->read.cancelled && !cb->read.thread) + XFREE(MTYPE_ZEROMQ_CB, cb); + return 0; + } + continue; + } + } + + if (written) + frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); + + funcname_thread_add_read_write(THREAD_WRITE, t->master, + frrzmq_write_msg, cbp, cb->fd, + &cb->write.thread, t->funcname, + t->schedfrom, t->schedfrom_line); + return 0; + +out_err: + zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno); + if (cb->write.cb_error) + cb->write.cb_error(cb->write.arg, cb->zmqsock); + return 1; +} +int funcname_frrzmq_thread_add_write(struct thread_master *master, + void (*msgfunc)(void *arg, void *zmqsock), + void (*errfunc)(void *arg, void *zmqsock), + void *arg, void *zmqsock, + struct frrzmq_cb **cbp, debugargdef) +{ + int fd, events; + size_t len; + struct frrzmq_cb *cb; + + if (!cbp) + return -1; + if (!msgfunc) + return -1; len = sizeof(fd); if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) - return NULL; + return -1; len = sizeof(events); if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) - return NULL; + return -1; - cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); - if (!cb) - return NULL; + if (*cbp) + cb = *cbp; + else { + cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); + cb->read.cancelled = 1; + if (!cb) + return -1; + *cbp = cb; + } - cb->arg = arg; cb->zmqsock = zmqsock; - cb->cb_msg = msgfunc; - cb->cb_part = partfunc; cb->fd = fd; + cb->write.arg = arg; + cb->write.cb_msg = msgfunc; + cb->write.cb_part = NULL; + cb->write.cb_error = errfunc; + cb->write.cancelled = 0; + + if (events & ZMQ_POLLOUT) { + if (cb->write.thread) { + thread_cancel(cb->write.thread); + cb->write.thread = NULL; + } + funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd, + &cb->write.thread, funcname, + schedfrom, fromln); + } else + funcname_thread_add_read_write( + THREAD_WRITE, master, frrzmq_write_msg, cbp, fd, + &cb->write.thread, funcname, schedfrom, fromln); + return 0; +} - if (events & ZMQ_POLLIN) - funcname_thread_add_event(master, - frrzmq_read_msg, cb, fd, &cb->thread, - funcname, schedfrom, fromln); - else - funcname_thread_add_read_write(THREAD_READ, master, - frrzmq_read_msg, cb, fd, &cb->thread, - funcname, schedfrom, fromln); - return cb; +void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) +{ + if (!cb || !*cb) + return; + core->cancelled = 1; + if (core->thread) { + thread_cancel(core->thread); + core->thread = NULL; + } + if ((*cb)->read.cancelled && !(*cb)->read.thread + && (*cb)->write.cancelled && (*cb)->write.thread) + XFREE(MTYPE_ZEROMQ_CB, *cb); } -void frrzmq_thread_cancel(struct frrzmq_cb *cb) +void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, + int event) { - if (!cb->thread) { - /* canceling from within callback */ - cb->cancelled = 1; + struct frrzmq_cb *cb; + int events; + size_t len; + + if (!cbp) + return; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return; + + if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) return; + if (events & event && core->thread && !core->cancelled) { + struct thread_master *tm = core->thread->master; + thread_cancel(core->thread); + core->thread = NULL; + thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg + : frrzmq_write_msg), + cbp, cb->fd, &core->thread); } - thread_cancel(cb->thread); - XFREE(MTYPE_ZEROMQ_CB, cb); } diff --git a/lib/frr_zmq.h b/lib/frr_zmq.h index 69c6f8580dbb..1146b879640c 100644 --- a/lib/frr_zmq.h +++ b/lib/frr_zmq.h @@ -33,6 +33,26 @@ * foo_LDFLAGS = libfrrzmq.la libfrr.la $(ZEROMQ_LIBS) */ +/* callback integration */ +struct cb_core { + struct thread *thread; + void *arg; + + bool cancelled; + + void (*cb_msg)(void *arg, void *zmqsock); + void (*cb_part)(void *arg, void *zmqsock, zmq_msg_t *msg, + unsigned partnum); + void (*cb_error)(void *arg, void *zmqsock); +}; +struct frrzmq_cb { + void *zmqsock; + int fd; + + struct cb_core read; + struct cb_core write; +}; + /* libzmq's context * * this is mostly here as a convenience, it has IPv6 enabled but nothing @@ -40,21 +60,27 @@ */ extern void *frrzmq_context; -extern void frrzmq_init (void); -extern void frrzmq_finish (void); +extern void frrzmq_init(void); +extern void frrzmq_finish(void); #define debugargdef const char *funcname, const char *schedfrom, int fromln /* core event registration, one of these 2 macros should be used */ -#define frrzmq_thread_add_read_msg(m,f,a,z) funcname_frrzmq_thread_add_read( \ - m,f,NULL,a,z,#f,__FILE__,__LINE__) -#define frrzmq_thread_add_read_part(m,f,a,z) funcname_frrzmq_thread_add_read( \ - m,NULL,f,a,z,#f,__FILE__,__LINE__) +#define frrzmq_thread_add_read_msg(m, f, e, a, z, d) \ + funcname_frrzmq_thread_add_read(m, f, NULL, e, a, z, d, #f, __FILE__, \ + __LINE__) +#define frrzmq_thread_add_read_part(m, f, e, a, z, d) \ + funcname_frrzmq_thread_add_read(m, NULL, f, e, a, z, d, #f, __FILE__, \ + __LINE__) +#define frrzmq_thread_add_write_msg(m, f, e, a, z, d) \ + funcname_frrzmq_thread_add_write(m, f, e, a, z, d, #f, __FILE__, \ + __LINE__) +struct cb_core; struct frrzmq_cb; -/* Set up a POLLIN notification to be called from the libfrr main loop. - * This has the following properties: +/* Set up a POLLIN or POLLOUT notification to be called from the libfrr main + * loop. This has the following properties: * * - since ZeroMQ works with edge triggered notifications, it will loop and * dispatch as many events as ZeroMQ has pending at the time libfrr calls @@ -67,22 +93,35 @@ struct frrzmq_cb; * - if partfunc is specified, the message is read and partfunc is called * for each ZeroMQ multi-part subpart. Note that you can't send replies * before all parts have been read because that violates the ZeroMQ FSM. + * - write version doesn't allow for partial callback, you must handle the + * whole message (all parts) in msgfunc callback * - you can safely cancel the callback from within itself * - installing a callback will check for pending events (ZMQ_EVENTS) and * may schedule the event to run as soon as libfrr is back in its main * loop. + */ +extern int funcname_frrzmq_thread_add_read( + struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock), + void (*partfunc)(void *arg, void *zmqsock, zmq_msg_t *msg, + unsigned partnum), + void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock, + struct frrzmq_cb **cb, debugargdef); +extern int funcname_frrzmq_thread_add_write( + struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock), + void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock, + struct frrzmq_cb **cb, debugargdef); + +extern void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core); + +/* + * http://api.zeromq.org/4-2:zmq-getsockopt#toc10 * - * TODO #1: add ZMQ_POLLERR / error callback - * TODO #2: add frrzmq_check_events() function to check for edge triggered - * things that may have happened after a zmq_send() call or so + * As the descriptor is edge triggered, applications must update the state of + * ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.To be more explicit: + * after calling zmq_send the socket may become readable (and vice versa) + * without triggering a read event on the file descriptor. */ -extern struct frrzmq_cb *funcname_frrzmq_thread_add_read( - struct thread_master *master, - void (*msgfunc)(void *arg, void *zmqsock), - void (*partfunc)(void *arg, void *zmqsock, - zmq_msg_t *msg, unsigned partnum), - void *arg, void *zmqsock, debugargdef); - -extern void frrzmq_thread_cancel(struct frrzmq_cb *cb); +extern void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, + int event); #endif /* _FRRZMQ_H */ diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c index c270ec3d18f8..b6624915e874 100644 --- a/tests/lib/test_zmq.c +++ b/tests/lib/test_zmq.c @@ -23,6 +23,7 @@ #include "frr_zmq.h" DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") +DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message") static struct thread_master *master; @@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint) XFREE(MTYPE_TESTBUF, data); } +static int recv_delim(void *zmqsock) +{ + /* receive delim */ + zmq_msg_t zdelim; + int more; + zmq_msg_init(&zdelim); + zmq_msg_recv(&zdelim, zmqsock, 0); + more = zmq_msg_more(&zdelim); + zmq_msg_close(&zdelim); + return more; +} +static void send_delim(void *zmqsock) +{ + /* Send delim */ + zmq_msg_t zdelim; + zmq_msg_init(&zdelim); + zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE); + zmq_msg_close(&zdelim); +} static void run_client(int syncfd) { int i, j; @@ -38,13 +58,14 @@ static void run_client(int syncfd) char dummy; void *zmqctx = NULL; void *zmqsock; + int more; read(syncfd, &dummy, 1); zmqctx = zmq_ctx_new(); zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); - zmqsock = zmq_socket(zmqctx, ZMQ_REQ); + zmqsock = zmq_socket(zmqctx, ZMQ_DEALER); if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { perror("zmq_connect"); exit(1); @@ -52,22 +73,28 @@ static void run_client(int syncfd) /* single-part */ for (i = 0; i < 8; i++) { - snprintf(buf, sizeof(buf), "msg #%d %c%c%c", - i, 'a' + i, 'b' + i, 'c' + i); + snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i, + 'b' + i, 'c' + i); printf("client send: %s\n", buf); fflush(stdout); - zmq_send(zmqsock, buf, strlen(buf) + 1, 0); - zmq_recv(zmqsock, buf, sizeof(buf), 0); - printf("client recv: %s\n", buf); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } } /* multipart */ for (i = 2; i < 5; i++) { - int more; - printf("---\n"); + send_delim(zmqsock); + zmq_msg_t part; for (j = 1; j <= i; j++) { - zmq_msg_t part; char *dyn = XMALLOC(MTYPE_TESTBUF, 32); snprintf(dyn, 32, "part %d/%d", j, i); @@ -79,7 +106,7 @@ static void run_client(int syncfd) zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); } - zmq_msg_t part; + recv_delim(zmqsock); do { char *data; @@ -90,26 +117,85 @@ static void run_client(int syncfd) } while (more); zmq_msg_close(&part); } + + /* write callback */ + printf("---\n"); + snprintf(buf, 32, "Done receiving"); + printf("client send: %s\n", buf); + fflush(stdout); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + /* wait for message from server */ + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } + zmq_close(zmqsock); zmq_ctx_term(zmqctx); } static struct frrzmq_cb *cb; +static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* receive id */ + zmq_msg_init(msg_id); + zmq_msg_recv(msg_id, zmqsock, 0); + /* receive delim */ + recv_delim(zmqsock); +} +static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* Send Id */ + zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE); + send_delim(zmqsock); +} +static void serverwritefn(void *arg, void *zmqsock) +{ + zmq_msg_t *msg_id = (zmq_msg_t *)arg; + char buf[32] = "Test write callback"; + size_t i; + + for (i = 0; i < strlen(buf); i++) + buf[i] = toupper(buf[i]); + printf("server send: %s\n", buf); + fflush(stdout); + send_id_and_delim(zmqsock, msg_id); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + + /* send just once */ + frrzmq_thread_cancel(&cb, &cb->write); + + zmq_msg_close(msg_id); + XFREE(MTYPE_ZMQMSG, msg_id); +} static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, - unsigned partnum) + unsigned partnum) { + static int num = 0; int more = zmq_msg_more(msg); char *in = zmq_msg_data(msg); size_t i; zmq_msg_t reply; char *out; + /* Id */ + if (partnum == 0) { + send_id_and_delim(zmqsock, msg); + return; + } + /* Delim */ + if (partnum == 1) + return; + + printf("server recv part %u (more: %d): %s\n", partnum, more, in); fflush(stdout); - /* REQ-REP doesn't allow sending a reply here */ - if (more) - return; out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); for (i = 0; i < strlen(in); i++) @@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); + if (more) + return; + out = XMALLOC(MTYPE_TESTBUF, 32); snprintf(out, 32, "msg# was %u", partnum); zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, 0); + + zmq_msg_close(&reply); + + if (++num < 7) + return; + + /* write callback test */ + char buf[32]; + zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t)); + recv_id_and_delim(zmqsock, msg_id); + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("server recv: %s\n", buf); + fflush(stdout); + + frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id, + zmqsock, &cb); } static void serverfn(void *arg, void *zmqsock) { static int num = 0; + zmq_msg_t msg_id; char buf[32]; size_t i; + + recv_id_and_delim(zmqsock, &msg_id); zmq_recv(zmqsock, buf, sizeof(buf), 0); printf("server recv: %s\n", buf); fflush(stdout); for (i = 0; i < strlen(buf); i++) buf[i] = toupper(buf[i]); + send_id_and_delim(zmqsock, &msg_id); + zmq_msg_close(&msg_id); zmq_send(zmqsock, buf, strlen(buf) + 1, 0); if (++num < 4) return; /* change to multipart callback */ - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); - cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); + frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock, + &cb); } static void sigchld(void) { printf("child exited.\n"); - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); } static struct quagga_signal_t sigs[] = { @@ -170,13 +283,13 @@ static void run_server(int syncfd) signal_init(master, array_size(sigs), sigs); frrzmq_init(); - zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); + zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER); if (zmq_bind(zmqsock, "tcp://*:17171")) { perror("zmq_bind"); exit(1); } - cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); + frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb); write(syncfd, &dummy, sizeof(dummy)); while (thread_fetch(master, &t)) diff --git a/tests/lib/test_zmq.refout b/tests/lib/test_zmq.refout index 61f45f02b137..acac50553d15 100644 --- a/tests/lib/test_zmq.refout +++ b/tests/lib/test_zmq.refout @@ -11,40 +11,57 @@ client send: msg #3 def server recv: msg #3 def client recv: MSG #3 DEF client send: msg #4 efg -server recv part 0 (more: 0): msg #4 efg +server recv part 2 (more: 0): msg #4 efg client recv: MSG #4 EFG +client recv: msg# was 2 client send: msg #5 fgh -client recv: msg# was 0 +server recv part 2 (more: 0): msg #5 fgh +client recv: MSG #5 FGH +client recv: msg# was 2 client send: msg #6 ghi -server recv part 0 (more: 0): msg #6 ghi +server recv part 2 (more: 0): msg #6 ghi client recv: MSG #6 GHI +client recv: msg# was 2 client send: msg #7 hij -client recv: msg# was 0 +server recv part 2 (more: 0): msg #7 hij +client recv: MSG #7 HIJ +client recv: msg# was 2 --- client send: part 1/2 client send: part 2/2 -server recv part 0 (more: 1): part 1/2 -server recv part 1 (more: 0): part 2/2 +server recv part 2 (more: 1): part 1/2 +server recv part 3 (more: 0): part 2/2 +client recv (more: 1): PART 1/2 client recv (more: 1): PART 2/2 -client recv (more: 0): msg# was 1 +client recv (more: 0): msg# was 3 --- client send: part 1/3 client send: part 2/3 client send: part 3/3 -server recv part 0 (more: 1): part 1/3 -server recv part 1 (more: 1): part 2/3 -server recv part 2 (more: 0): part 3/3 +server recv part 2 (more: 1): part 1/3 +server recv part 3 (more: 1): part 2/3 +server recv part 4 (more: 0): part 3/3 +client recv (more: 1): PART 1/3 +client recv (more: 1): PART 2/3 client recv (more: 1): PART 3/3 -client recv (more: 0): msg# was 2 +client recv (more: 0): msg# was 4 --- client send: part 1/4 client send: part 2/4 client send: part 3/4 client send: part 4/4 -server recv part 0 (more: 1): part 1/4 -server recv part 1 (more: 1): part 2/4 -server recv part 2 (more: 1): part 3/4 -server recv part 3 (more: 0): part 4/4 +server recv part 2 (more: 1): part 1/4 +server recv part 3 (more: 1): part 2/4 +server recv part 4 (more: 1): part 3/4 +server recv part 5 (more: 0): part 4/4 +client recv (more: 1): PART 1/4 +client recv (more: 1): PART 2/4 +client recv (more: 1): PART 3/4 client recv (more: 1): PART 4/4 -client recv (more: 0): msg# was 3 +client recv (more: 0): msg# was 5 +--- +client send: Done receiving +server recv: Done receiving +server send: TEST WRITE CALLBACK +client recv: TEST WRITE CALLBACK child exited.