diff --git a/deps/ccommon/include/channel/cc_tcp.h b/deps/ccommon/include/channel/cc_tcp.h index d1dad2bf3..c4babcae8 100644 --- a/deps/ccommon/include/channel/cc_tcp.h +++ b/deps/ccommon/include/channel/cc_tcp.h @@ -128,7 +128,8 @@ ssize_t tcp_recvv(struct tcp_conn *c, struct array *bufv, size_t nbyte); ssize_t tcp_sendv(struct tcp_conn *c, struct array *bufv, size_t nbyte); bool tcp_accept(struct tcp_conn *sc, struct tcp_conn *c); /* channel_accept_fn */ -void tcp_reject(struct tcp_conn *sc); /* channel_reject_fn */ +void tcp_reject(struct tcp_conn *sc); /* channel_reject_fn */ +void tcp_reject_all(struct tcp_conn *sc); /* channel_reject_fn */ /* functions getting/setting connection attribute */ int tcp_set_blocking(int sd); diff --git a/deps/ccommon/src/channel/cc_tcp.c b/deps/ccommon/src/channel/cc_tcp.c index 244123c24..d482ac9c0 100644 --- a/deps/ccommon/src/channel/cc_tcp.c +++ b/deps/ccommon/src/channel/cc_tcp.c @@ -377,6 +377,11 @@ tcp_accept(struct tcp_conn *sc, struct tcp_conn *c) return true; } + +/* + * due to lack of a direct rejection API in POSIX, tcp_reject accepts the + * frontmost connection and immediately closes it + */ void tcp_reject(struct tcp_conn *sc) { @@ -397,6 +402,45 @@ tcp_reject(struct tcp_conn *sc) } } +/* + * due to lack of a direct rejection API in POSIX, tcp_reject_all accepts + * connections ready on the listening socket, and immediately closes them. + * It does so until there are no more pending connections. + */ +void +tcp_reject_all(struct tcp_conn *sc) +{ + int ret; + int sd; + + for (;;) { + sd = accept(sc->sd, NULL, NULL); + if (sd < 0) { + if (errno == EINTR) { + log_debug("sd %d not ready: eintr", sc->sd); + continue; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) { + log_debug("sd %d has no more outstanding connections", sc->sd); + return; + } + + log_error("accept on sd %d failed: %s", sc->sd, strerror(errno)); + INCR(tcp_metrics, tcp_reject_ex); + return; + } + + ret = close(sd); + if (ret < 0) { + INCR(tcp_metrics, tcp_reject_ex); + log_warn("close c %d failed, ignored: %s", sd, strerror(errno)); + } + + INCR(tcp_metrics, tcp_reject); + } +} + int tcp_set_blocking(int sd) { diff --git a/deps/ccommon/test/channel/tcp/check_tcp.c b/deps/ccommon/test/channel/tcp/check_tcp.c index e185d4733..1e39156ba 100644 --- a/deps/ccommon/test/channel/tcp/check_tcp.c +++ b/deps/ccommon/test/channel/tcp/check_tcp.c @@ -72,7 +72,7 @@ find_port_listen(struct tcp_conn **_conn_listen, struct addrinfo **_ai, uint16_t freeaddrinfo(ai); } /* for some reason this line is needed, I would appreciate some insight */ - ck_assert_int_eq(tcp_connect(ai, conn_client), true); + ck_assert(tcp_connect(ai, conn_client)); tcp_reject(conn_listen); if (_conn_listen) { @@ -157,7 +157,7 @@ START_TEST(test_client_send_server_recv) conn_server = tcp_conn_create(); ck_assert_ptr_ne(conn_server, NULL); - ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true); + ck_assert(tcp_accept(conn_listen, conn_server)); ck_assert_int_eq(tcp_send(conn_client, send_data, LEN), LEN); while ((recv = tcp_recv(conn_server, recv_data, LEN + 1)) == CC_EAGAIN) {} ck_assert_int_eq(recv, LEN); @@ -199,7 +199,7 @@ START_TEST(test_server_send_client_recv) conn_server = tcp_conn_create(); ck_assert_ptr_ne(conn_server, NULL); - ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true); + ck_assert(tcp_accept(conn_listen, conn_server)); ck_assert_int_eq(tcp_send(conn_server, send_data, LEN), LEN); while ((recv = tcp_recv(conn_client, recv_data, LEN + 1)) == CC_EAGAIN) {} ck_assert_int_eq(recv, LEN); @@ -252,7 +252,7 @@ START_TEST(test_client_sendv_server_recvv) conn_server = tcp_conn_create(); ck_assert_ptr_ne(conn_server, NULL); - ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true); + ck_assert(tcp_accept(conn_listen, conn_server)); ck_assert_int_eq(tcp_sendv(conn_client, send_array, LEN), LEN); while ((recv = tcp_recvv(conn_server, recv_array, LEN + 1)) == CC_EAGAIN) {} ck_assert_int_eq(recv, LEN); @@ -315,7 +315,7 @@ START_TEST(test_nonblocking) conn_server = tcp_conn_create(); ck_assert_ptr_ne(conn_server, NULL); - ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true); + ck_assert(tcp_accept(conn_listen, conn_server)); task.usleep = SLEEP_TIME; task.c = conn_server; diff --git a/src/core/admin/admin.c b/src/core/admin/admin.c index 16319f6af..eea8cddcb 100644 --- a/src/core/admin/admin.c +++ b/src/core/admin/admin.c @@ -41,7 +41,7 @@ _admin_close(struct buf_sock *s) { event_del(ctx->evb, hdl->rid(s->ch)); hdl->term(s->ch); - buf_sock_return(&s); + buf_sock_destroy(&s); } static inline void @@ -50,7 +50,7 @@ _tcp_accept(struct buf_sock *ss) struct buf_sock *s; struct tcp_conn *sc = ss->ch; - s = buf_sock_borrow(); + s = buf_sock_create(); /* admin thread: always directly create not borrow */ if (s == NULL) { log_error("establish connection failed: cannot allocate buf_sock, " "reject connection request"); @@ -245,7 +245,7 @@ core_admin_setup(admin_options_st *options) hdl->rid = (channel_id_fn)tcp_read_id; hdl->wid = (channel_id_fn)tcp_write_id; - admin_sock = buf_sock_borrow(); + admin_sock = buf_sock_create(); if (admin_sock == NULL) { log_crit("failed to set up admin thread; could not get buf_sock"); goto error; @@ -294,7 +294,7 @@ core_admin_teardown(void) timing_wheel_destroy(&tw); event_base_destroy(&(ctx->evb)); freeaddrinfo(admin_ai); - buf_sock_return(&admin_sock); + buf_sock_destroy(&admin_sock); } admin_init = false; } diff --git a/src/core/data/server.c b/src/core/data/server.c index e6285ad1b..ef859a4ab 100644 --- a/src/core/data/server.c +++ b/src/core/data/server.c @@ -20,10 +20,10 @@ #define SERVER_MODULE_NAME "core::server" -#define SLEEP_CONN_USEC 10000 /* sleep for 10ms on out-of-stream-object error */ - -struct pipe_conn *pipe_c = NULL; -struct ring_array *conn_arr = NULL; +struct pipe_conn *pipe_new = NULL; /* server(w) -> worker(r) */ +struct pipe_conn *pipe_term = NULL; /* worker(w) -> server(r) */ +struct ring_array *conn_new = NULL; /* server(w) -> worker(r) */ +struct ring_array *conn_term = NULL; /* worker(w) -> server(r) */ static server_metrics_st *server_metrics = NULL; @@ -36,6 +36,13 @@ static channel_handler_st *hdl = &handlers; static struct addrinfo *server_ai; static struct buf_sock *server_sock; /* server buf_sock */ +/* Note: server thread currently owns the stream (buf_sock) pool. Other threads + * either need to get the connection from server (the case for worker thread) or + * have to directly create their own, instead of borrowing (the case for admin + * thread), to avoid concurrency issues around pooling operations, which are not + * thread-safe. + */ + static inline void _server_close(struct buf_sock *s) { @@ -50,20 +57,48 @@ _server_close(struct buf_sock *s) static inline void _server_pipe_write(void) { - ASSERT(pipe_c != NULL); + ASSERT(pipe_new != NULL); - ssize_t status = pipe_send(pipe_c, "", 1); + ssize_t status = pipe_send(pipe_new, "", 1); if (status == 0 || status == CC_EAGAIN) { /* retry write */ log_verb("server core: retry send on pipe"); - event_add_write(ctx->evb, pipe_write_id(pipe_c), NULL); + event_add_write(ctx->evb, pipe_write_id(pipe_new), NULL); } else if (status == CC_ERROR) { - /* other reasn write can't be done */ - log_error("could not write to pipe - %s", strerror(pipe_c->err)); + log_error("could not write to pipe - %s", strerror(pipe_new->err)); + } +} + +/* pipe_read recycles returned streams from worker thread */ +static inline void +_server_pipe_read(void) +{ + struct buf_sock *s; + char buf[RING_ARRAY_DEFAULT_CAP]; /* buffer for discarding pipe data */ + int i; + rstatus_i status; + + ASSERT(pipe_term != NULL); + + i = pipe_recv(pipe_term, buf, RING_ARRAY_DEFAULT_CAP); + if (i < 0) { /* errors, do not read from ring array */ + log_warn("not reclaiming connections due to pipe error"); + return; } - /* else, pipe write succeeded and no action needs to be taken */ + /* each byte in the pipe corresponds to a connection in the array */ + for (; i > 0; --i) { + status = ring_array_pop(&s, conn_term); + if (status != CC_OK) { + log_warn("event number does not match conn queue: missing %d conns", + i); + return; + } + log_verb("Recycling buf_sock %p from worker thread", s); + hdl->term(s->ch); + buf_sock_return(&s); + } } /* returns true if a connection is present, false if no more pending */ @@ -112,7 +147,6 @@ _tcp_accept(struct buf_sock *ss) log_error("establish connection failed: cannot allocate buf_sock, " "reject connection request"); ss->hdl->reject(sc); /* server rejects connection by closing it */ - usleep(SLEEP_CONN_USEC); return false; } @@ -122,8 +156,8 @@ _tcp_accept(struct buf_sock *ss) } /* push buf_sock to queue */ - ring_array_push(&s, conn_arr); - + ring_array_push(&s, conn_new); + /* notify worker, note this may fail and will be retried via write event */ _server_pipe_write(); return true; @@ -143,30 +177,33 @@ static void _server_event(void *arg, uint32_t events) { struct buf_sock *s = arg; - - log_verb("server event %06"PRIX32" on buf_sock %p", events, s); - - if (events & EVENT_ERR) { - INCR(server_metrics, server_event_error); - _server_close(s); - - return; - } - - if (events & EVENT_READ) { - log_verb("processing server read event on buf_sock %p", s); - - INCR(server_metrics, server_event_read); - _server_event_read(s); - } - - if (events & EVENT_WRITE) { - /* the only server write event is write on pipe */ - - log_verb("processing server write event"); - _server_pipe_write(); - - INCR(server_metrics, server_event_write); + log_verb("server event %06"PRIX32" with data %p", events, s); + + if (s == NULL) { /* event on pipe */ + if (events & EVENT_READ) { /* terminating connection from worker */ + log_verb("processing server read event on pipe"); + INCR(server_metrics, server_event_read); + _server_pipe_read(); + } else if (events & EVENT_WRITE) { /* retrying worker notification */ + log_verb("processing server write event on pipe"); + INCR(server_metrics, server_event_write); + _server_pipe_write(); + } else { /* EVENT_ERR */ + log_debug("processing server error event on pipe"); + INCR(server_metrics, server_event_error); + } + } else { /* event on listening socket */ + if (events & EVENT_READ) { + log_verb("processing server read event on buf_sock %p", s); + INCR(server_metrics, server_event_read); + _server_event_read(s); + } else if (events & EVENT_ERR) { /* effectively refusing new conn */ + /* TODO: shall we retry bind and listen ? */ + log_debug("processing server error event on listening socket"); + _server_close(s); + } else { + NOT_REACHED(); + } } } @@ -196,22 +233,34 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics) } /* setup shared data structures between server and worker */ - pipe_c = pipe_conn_create(); - if (pipe_c == NULL) { + pipe_new = pipe_conn_create(); + pipe_term = pipe_conn_create(); + if (pipe_new == NULL || pipe_term == NULL) { log_error("Could not create connection for pipe, abort"); goto error; } - if (!pipe_open(NULL, pipe_c)) { - log_error("Could not open pipe connection: %s", strerror(pipe_c->err)); + if (!pipe_open(NULL, pipe_new)) { + log_error("Could not open pipe for new connection: %s", + strerror(pipe_new->err)); goto error; } + if (!pipe_open(NULL, pipe_term)) { + log_error("Could not open pipe for terminated connection: %s", + strerror(pipe_term->err)); + goto error; + } + - pipe_set_nonblocking(pipe_c); + pipe_set_nonblocking(pipe_new); + pipe_set_nonblocking(pipe_term); - conn_arr = ring_array_create(sizeof(struct buf_sock *), RING_ARRAY_DEFAULT_CAP); - if (conn_arr == NULL) { - log_error("core setup failed: could not allocate conn array"); + conn_new = ring_array_create(sizeof(struct buf_sock *), + RING_ARRAY_DEFAULT_CAP); + conn_term = ring_array_create(sizeof(struct buf_sock *), + RING_ARRAY_DEFAULT_CAP); + if (conn_new == NULL || conn_term == NULL) { + log_error("core setup failed: could not allocate conn array(s)"); goto error; } @@ -223,7 +272,7 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics) } hdl->accept = (channel_accept_fn)tcp_accept; - hdl->reject = (channel_reject_fn)tcp_reject; + hdl->reject = (channel_reject_fn)tcp_reject_all; hdl->open = (channel_open_fn)tcp_listen; hdl->term = (channel_term_fn)tcp_close; hdl->recv = (channel_recv_fn)tcp_recv; @@ -259,6 +308,7 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics) c->level = CHANNEL_META; event_add_read(ctx->evb, hdl->rid(c), server_sock); + event_add_read(ctx->evb, pipe_read_id(pipe_term), NULL); server_init = true; @@ -280,8 +330,10 @@ core_server_teardown(void) freeaddrinfo(server_ai); buf_sock_return(&server_sock); } - ring_array_destroy(conn_arr); - pipe_conn_destroy(&pipe_c); + ring_array_destroy(conn_term); + ring_array_destroy(conn_new); + pipe_conn_destroy(&pipe_term); + pipe_conn_destroy(&pipe_new); server_metrics = NULL; server_init = false; } diff --git a/src/core/data/shared.h b/src/core/data/shared.h index c0529593a..ad6760bfa 100644 --- a/src/core/data/shared.h +++ b/src/core/data/shared.h @@ -4,7 +4,9 @@ struct pipe_conn; struct ring_array; /* pipe for server/worker thread communication */ -extern struct pipe_conn *pipe_c; +extern struct pipe_conn *pipe_new; +extern struct pipe_conn *pipe_term; /* array holding accepted connections */ -extern struct ring_array *conn_arr; +extern struct ring_array *conn_new; +extern struct ring_array *conn_term; diff --git a/src/core/data/worker.c b/src/core/data/worker.c index ef11f4efe..b383880e3 100644 --- a/src/core/data/worker.c +++ b/src/core/data/worker.c @@ -68,17 +68,6 @@ _worker_event_write(struct buf_sock *s) return status; } -static inline void -worker_close(struct buf_sock *s) -{ - log_info("worker core close on buf_sock %p", s); - - processor->error(&s->rbuf, &s->wbuf, &s->data); - event_del(ctx->evb, hdl->rid(s->ch)); - hdl->term(s->ch); - buf_sock_return(&s); -} - /* read event over an existing connection */ static inline void _worker_event_read(struct buf_sock *s) @@ -101,7 +90,7 @@ _worker_event_read(struct buf_sock *s) } static void -worker_add_conn(void) +worker_add_stream(void) { struct buf_sock *s; char buf[RING_ARRAY_DEFAULT_CAP]; /* buffer for discarding pipe data */ @@ -117,7 +106,7 @@ worker_add_conn(void) * for the next read event in that case. */ - i = pipe_recv(pipe_c, buf, RING_ARRAY_DEFAULT_CAP); + i = pipe_recv(pipe_new, buf, RING_ARRAY_DEFAULT_CAP); if (i < 0) { /* errors, do not read from ring array */ log_warn("not adding new connections due to pipe error"); return; @@ -127,7 +116,7 @@ worker_add_conn(void) * now get from the ring array */ for (; i > 0; --i) { - status = ring_array_pop(&s, conn_arr); + status = ring_array_pop(&s, conn_new); if (status != CC_OK) { log_warn("event number does not match conn queue: missing %d conns", i); @@ -136,25 +125,59 @@ worker_add_conn(void) log_verb("Adding new buf_sock %p to worker thread", s); s->owner = ctx; s->hdl = hdl; - event_add_read(ctx->evb, hdl->rid(s->ch), s); + event_add_read(ctx->evb, hdl->rid(s->ch), s); /* event activated */ + } +} + +static inline void +_worker_pipe_write(void) +{ + ASSERT(pipe_term != NULL); + + ssize_t status = pipe_send(pipe_term, "", 1); + + if (status == 0 || status == CC_EAGAIN) { + /* retry write */ + log_verb("server core: retry send on pipe"); + event_add_write(ctx->evb, pipe_write_id(pipe_term), NULL); + } else if (status == CC_ERROR) { + log_error("could not write to pipe - %s", strerror(pipe_term->err)); } } +static void +worker_ret_stream(struct buf_sock *s) +{ + log_info("worker core marking buf_sock %p for return", s); + + /* first clean up states that only worker thread understands, + * and stop receiving event updates. then it's safe to return to server + */ + processor->error(&s->rbuf, &s->wbuf, &s->data); + event_del(ctx->evb, hdl->rid(s->ch)); + + /* push buf_sock to queue */ + ring_array_push(&s, conn_term); + /* conn_term */ + _worker_pipe_write(); +} + static void _worker_event(void *arg, uint32_t events) { struct buf_sock *s = arg; - log_verb("worker event %06"PRIX32" on buf_sock %p", events, s); + log_verb("worker event %06"PRIX32" with data %p", events, s); - if (s == NULL) { - /* event on pipe_c, new connection */ - if (events & EVENT_READ) { - worker_add_conn(); - } else if (events & EVENT_ERR) { - log_error("error event received on conn_fds pipe"); - } else { - /* there should never be any write events on the pipe from worker */ - NOT_REACHED(); + if (s == NULL) { /* event on pipe */ + if (events & EVENT_READ) { /* new connection from server */ + INCR(worker_metrics, worker_event_read); + worker_add_stream(); + } else if (events & EVENT_WRITE) { /* retry return notification */ + INCR(worker_metrics, worker_event_write); + _worker_pipe_write(); + } else { /* EVENT_ERR */ + INCR(worker_metrics, worker_event_error); + log_error("error event received on pipe"); } } else { /* event on one of the connections */ @@ -189,7 +212,7 @@ _worker_event(void *arg, uint32_t events) * but simpler to implement and probably fine initially. */ if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) { - worker_close(s); + worker_ret_stream(s); } } } @@ -221,16 +244,17 @@ core_worker_setup(worker_options_st *options, worker_metrics_st *metrics) exit(EX_CONFIG); } - hdl->accept = (channel_accept_fn)tcp_accept; - hdl->reject = (channel_reject_fn)tcp_reject; - hdl->open = (channel_open_fn)tcp_listen; - hdl->term = (channel_term_fn)tcp_close; + /* worker thread does not handle accept/reject/open/term directly */ + hdl->accept = NULL; + hdl->reject = NULL; + hdl->open = NULL; + hdl->term = NULL; hdl->recv = (channel_recv_fn)tcp_recv; hdl->send = (channel_send_fn)tcp_send; hdl->rid = (channel_id_fn)tcp_read_id; hdl->wid = (channel_id_fn)tcp_write_id; - event_add_read(ctx->evb, pipe_read_id(pipe_c), NULL); + event_add_read(ctx->evb, pipe_read_id(pipe_new), NULL); worker_init = true; } diff --git a/src/server/twemcache/admin/process.c b/src/server/twemcache/admin/process.c index 2edbc062a..aa2e577c5 100644 --- a/src/server/twemcache/admin/process.c +++ b/src/server/twemcache/admin/process.c @@ -31,7 +31,8 @@ admin_process_setup(void) nmetric_perslab = METRIC_CARDINALITY(perslab[0]); /* perslab metric size <(32 + 20)B, prefix/suffix 12B, total < 64 */ - cap = MAX(nmetric, nmetric_perslab * SLABCLASS_MAX_ID) * METRIC_PRINT_LEN; + cap = MAX(nmetric, nmetric_perslab * SLABCLASS_MAX_ID) * METRIC_PRINT_LEN + + METRIC_END_LEN; buf = cc_alloc(cap); /* TODO: check return status of cc_alloc */