Skip to content

Commit

Permalink
Fixing issue #179 (#180)
Browse files Browse the repository at this point in the history
* add logic to use pipe for returning connections back to server thread

* Squashed 'deps/ccommon/' changes from bb298bc..a2d5c01

git-subtree-dir: deps/ccommon
git-subtree-split: a2d5c01

* fix some bugs

* remove sleep logic from server thread, create stream object directly in admin thread

* adding comment around pooled resources
  • Loading branch information
Yao Yue authored Jul 5, 2018
1 parent db89acc commit 78353d0
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 92 deletions.
3 changes: 2 additions & 1 deletion deps/ccommon/include/channel/cc_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ ssize_t tcp_recvv(struct tcp_conn *c, struct array *bufv, size_t nbyte);
ssize_t tcp_sendv(struct tcp_conn *c, struct array *bufv, size_t nbyte);

bool tcp_accept(struct tcp_conn *sc, struct tcp_conn *c); /* channel_accept_fn */
void tcp_reject(struct tcp_conn *sc); /* channel_reject_fn */
void tcp_reject(struct tcp_conn *sc); /* channel_reject_fn */
void tcp_reject_all(struct tcp_conn *sc); /* channel_reject_fn */

/* functions getting/setting connection attribute */
int tcp_set_blocking(int sd);
Expand Down
44 changes: 44 additions & 0 deletions deps/ccommon/src/channel/cc_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ tcp_accept(struct tcp_conn *sc, struct tcp_conn *c)
return true;
}


/*
* due to lack of a direct rejection API in POSIX, tcp_reject accepts the
* frontmost connection and immediately closes it
*/
void
tcp_reject(struct tcp_conn *sc)
{
Expand All @@ -397,6 +402,45 @@ tcp_reject(struct tcp_conn *sc)
}
}

/*
* due to lack of a direct rejection API in POSIX, tcp_reject_all accepts
* connections ready on the listening socket, and immediately closes them.
* It does so until there are no more pending connections.
*/
void
tcp_reject_all(struct tcp_conn *sc)
{
int ret;
int sd;

for (;;) {
sd = accept(sc->sd, NULL, NULL);
if (sd < 0) {
if (errno == EINTR) {
log_debug("sd %d not ready: eintr", sc->sd);
continue;
}

if (errno == EAGAIN || errno == EWOULDBLOCK) {
log_debug("sd %d has no more outstanding connections", sc->sd);
return;
}

log_error("accept on sd %d failed: %s", sc->sd, strerror(errno));
INCR(tcp_metrics, tcp_reject_ex);
return;
}

ret = close(sd);
if (ret < 0) {
INCR(tcp_metrics, tcp_reject_ex);
log_warn("close c %d failed, ignored: %s", sd, strerror(errno));
}

INCR(tcp_metrics, tcp_reject);
}
}

int
tcp_set_blocking(int sd)
{
Expand Down
10 changes: 5 additions & 5 deletions deps/ccommon/test/channel/tcp/check_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ find_port_listen(struct tcp_conn **_conn_listen, struct addrinfo **_ai, uint16_t
freeaddrinfo(ai);
}
/* for some reason this line is needed, I would appreciate some insight */
ck_assert_int_eq(tcp_connect(ai, conn_client), true);
ck_assert(tcp_connect(ai, conn_client));
tcp_reject(conn_listen);

if (_conn_listen) {
Expand Down Expand Up @@ -157,7 +157,7 @@ START_TEST(test_client_send_server_recv)
conn_server = tcp_conn_create();
ck_assert_ptr_ne(conn_server, NULL);

ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true);
ck_assert(tcp_accept(conn_listen, conn_server));
ck_assert_int_eq(tcp_send(conn_client, send_data, LEN), LEN);
while ((recv = tcp_recv(conn_server, recv_data, LEN + 1)) == CC_EAGAIN) {}
ck_assert_int_eq(recv, LEN);
Expand Down Expand Up @@ -199,7 +199,7 @@ START_TEST(test_server_send_client_recv)
conn_server = tcp_conn_create();
ck_assert_ptr_ne(conn_server, NULL);

ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true);
ck_assert(tcp_accept(conn_listen, conn_server));
ck_assert_int_eq(tcp_send(conn_server, send_data, LEN), LEN);
while ((recv = tcp_recv(conn_client, recv_data, LEN + 1)) == CC_EAGAIN) {}
ck_assert_int_eq(recv, LEN);
Expand Down Expand Up @@ -252,7 +252,7 @@ START_TEST(test_client_sendv_server_recvv)
conn_server = tcp_conn_create();
ck_assert_ptr_ne(conn_server, NULL);

ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true);
ck_assert(tcp_accept(conn_listen, conn_server));
ck_assert_int_eq(tcp_sendv(conn_client, send_array, LEN), LEN);
while ((recv = tcp_recvv(conn_server, recv_array, LEN + 1)) == CC_EAGAIN) {}
ck_assert_int_eq(recv, LEN);
Expand Down Expand Up @@ -315,7 +315,7 @@ START_TEST(test_nonblocking)
conn_server = tcp_conn_create();
ck_assert_ptr_ne(conn_server, NULL);

ck_assert_int_eq(tcp_accept(conn_listen, conn_server), true);
ck_assert(tcp_accept(conn_listen, conn_server));

task.usleep = SLEEP_TIME;
task.c = conn_server;
Expand Down
8 changes: 4 additions & 4 deletions src/core/admin/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ _admin_close(struct buf_sock *s)
{
event_del(ctx->evb, hdl->rid(s->ch));
hdl->term(s->ch);
buf_sock_return(&s);
buf_sock_destroy(&s);
}

static inline void
Expand All @@ -50,7 +50,7 @@ _tcp_accept(struct buf_sock *ss)
struct buf_sock *s;
struct tcp_conn *sc = ss->ch;

s = buf_sock_borrow();
s = buf_sock_create(); /* admin thread: always directly create not borrow */
if (s == NULL) {
log_error("establish connection failed: cannot allocate buf_sock, "
"reject connection request");
Expand Down Expand Up @@ -245,7 +245,7 @@ core_admin_setup(admin_options_st *options)
hdl->rid = (channel_id_fn)tcp_read_id;
hdl->wid = (channel_id_fn)tcp_write_id;

admin_sock = buf_sock_borrow();
admin_sock = buf_sock_create();
if (admin_sock == NULL) {
log_crit("failed to set up admin thread; could not get buf_sock");
goto error;
Expand Down Expand Up @@ -294,7 +294,7 @@ core_admin_teardown(void)
timing_wheel_destroy(&tw);
event_base_destroy(&(ctx->evb));
freeaddrinfo(admin_ai);
buf_sock_return(&admin_sock);
buf_sock_destroy(&admin_sock);
}
admin_init = false;
}
Expand Down
148 changes: 100 additions & 48 deletions src/core/data/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

#define SERVER_MODULE_NAME "core::server"

#define SLEEP_CONN_USEC 10000 /* sleep for 10ms on out-of-stream-object error */

struct pipe_conn *pipe_c = NULL;
struct ring_array *conn_arr = NULL;
struct pipe_conn *pipe_new = NULL; /* server(w) -> worker(r) */
struct pipe_conn *pipe_term = NULL; /* worker(w) -> server(r) */
struct ring_array *conn_new = NULL; /* server(w) -> worker(r) */
struct ring_array *conn_term = NULL; /* worker(w) -> server(r) */

static server_metrics_st *server_metrics = NULL;

Expand All @@ -36,6 +36,13 @@ static channel_handler_st *hdl = &handlers;
static struct addrinfo *server_ai;
static struct buf_sock *server_sock; /* server buf_sock */

/* Note: server thread currently owns the stream (buf_sock) pool. Other threads
* either need to get the connection from server (the case for worker thread) or
* have to directly create their own, instead of borrowing (the case for admin
* thread), to avoid concurrency issues around pooling operations, which are not
* thread-safe.
*/

static inline void
_server_close(struct buf_sock *s)
{
Expand All @@ -50,20 +57,48 @@ _server_close(struct buf_sock *s)
static inline void
_server_pipe_write(void)
{
ASSERT(pipe_c != NULL);
ASSERT(pipe_new != NULL);

ssize_t status = pipe_send(pipe_c, "", 1);
ssize_t status = pipe_send(pipe_new, "", 1);

if (status == 0 || status == CC_EAGAIN) {
/* retry write */
log_verb("server core: retry send on pipe");
event_add_write(ctx->evb, pipe_write_id(pipe_c), NULL);
event_add_write(ctx->evb, pipe_write_id(pipe_new), NULL);
} else if (status == CC_ERROR) {
/* other reasn write can't be done */
log_error("could not write to pipe - %s", strerror(pipe_c->err));
log_error("could not write to pipe - %s", strerror(pipe_new->err));
}
}

/* pipe_read recycles returned streams from worker thread */
static inline void
_server_pipe_read(void)
{
struct buf_sock *s;
char buf[RING_ARRAY_DEFAULT_CAP]; /* buffer for discarding pipe data */
int i;
rstatus_i status;

ASSERT(pipe_term != NULL);

i = pipe_recv(pipe_term, buf, RING_ARRAY_DEFAULT_CAP);
if (i < 0) { /* errors, do not read from ring array */
log_warn("not reclaiming connections due to pipe error");
return;
}

/* else, pipe write succeeded and no action needs to be taken */
/* each byte in the pipe corresponds to a connection in the array */
for (; i > 0; --i) {
status = ring_array_pop(&s, conn_term);
if (status != CC_OK) {
log_warn("event number does not match conn queue: missing %d conns",
i);
return;
}
log_verb("Recycling buf_sock %p from worker thread", s);
hdl->term(s->ch);
buf_sock_return(&s);
}
}

/* returns true if a connection is present, false if no more pending */
Expand Down Expand Up @@ -112,7 +147,6 @@ _tcp_accept(struct buf_sock *ss)
log_error("establish connection failed: cannot allocate buf_sock, "
"reject connection request");
ss->hdl->reject(sc); /* server rejects connection by closing it */
usleep(SLEEP_CONN_USEC);
return false;
}

Expand All @@ -122,8 +156,8 @@ _tcp_accept(struct buf_sock *ss)
}

/* push buf_sock to queue */
ring_array_push(&s, conn_arr);

ring_array_push(&s, conn_new);
/* notify worker, note this may fail and will be retried via write event */
_server_pipe_write();

return true;
Expand All @@ -143,30 +177,33 @@ static void
_server_event(void *arg, uint32_t events)
{
struct buf_sock *s = arg;

log_verb("server event %06"PRIX32" on buf_sock %p", events, s);

if (events & EVENT_ERR) {
INCR(server_metrics, server_event_error);
_server_close(s);

return;
}

if (events & EVENT_READ) {
log_verb("processing server read event on buf_sock %p", s);

INCR(server_metrics, server_event_read);
_server_event_read(s);
}

if (events & EVENT_WRITE) {
/* the only server write event is write on pipe */

log_verb("processing server write event");
_server_pipe_write();

INCR(server_metrics, server_event_write);
log_verb("server event %06"PRIX32" with data %p", events, s);

if (s == NULL) { /* event on pipe */
if (events & EVENT_READ) { /* terminating connection from worker */
log_verb("processing server read event on pipe");
INCR(server_metrics, server_event_read);
_server_pipe_read();
} else if (events & EVENT_WRITE) { /* retrying worker notification */
log_verb("processing server write event on pipe");
INCR(server_metrics, server_event_write);
_server_pipe_write();
} else { /* EVENT_ERR */
log_debug("processing server error event on pipe");
INCR(server_metrics, server_event_error);
}
} else { /* event on listening socket */
if (events & EVENT_READ) {
log_verb("processing server read event on buf_sock %p", s);
INCR(server_metrics, server_event_read);
_server_event_read(s);
} else if (events & EVENT_ERR) { /* effectively refusing new conn */
/* TODO: shall we retry bind and listen ? */
log_debug("processing server error event on listening socket");
_server_close(s);
} else {
NOT_REACHED();
}
}
}

Expand Down Expand Up @@ -196,22 +233,34 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics)
}

/* setup shared data structures between server and worker */
pipe_c = pipe_conn_create();
if (pipe_c == NULL) {
pipe_new = pipe_conn_create();
pipe_term = pipe_conn_create();
if (pipe_new == NULL || pipe_term == NULL) {
log_error("Could not create connection for pipe, abort");
goto error;
}

if (!pipe_open(NULL, pipe_c)) {
log_error("Could not open pipe connection: %s", strerror(pipe_c->err));
if (!pipe_open(NULL, pipe_new)) {
log_error("Could not open pipe for new connection: %s",
strerror(pipe_new->err));
goto error;
}
if (!pipe_open(NULL, pipe_term)) {
log_error("Could not open pipe for terminated connection: %s",
strerror(pipe_term->err));
goto error;
}


pipe_set_nonblocking(pipe_c);
pipe_set_nonblocking(pipe_new);
pipe_set_nonblocking(pipe_term);

conn_arr = ring_array_create(sizeof(struct buf_sock *), RING_ARRAY_DEFAULT_CAP);
if (conn_arr == NULL) {
log_error("core setup failed: could not allocate conn array");
conn_new = ring_array_create(sizeof(struct buf_sock *),
RING_ARRAY_DEFAULT_CAP);
conn_term = ring_array_create(sizeof(struct buf_sock *),
RING_ARRAY_DEFAULT_CAP);
if (conn_new == NULL || conn_term == NULL) {
log_error("core setup failed: could not allocate conn array(s)");
goto error;
}

Expand All @@ -223,7 +272,7 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics)
}

hdl->accept = (channel_accept_fn)tcp_accept;
hdl->reject = (channel_reject_fn)tcp_reject;
hdl->reject = (channel_reject_fn)tcp_reject_all;
hdl->open = (channel_open_fn)tcp_listen;
hdl->term = (channel_term_fn)tcp_close;
hdl->recv = (channel_recv_fn)tcp_recv;
Expand Down Expand Up @@ -259,6 +308,7 @@ core_server_setup(server_options_st *options, server_metrics_st *metrics)
c->level = CHANNEL_META;

event_add_read(ctx->evb, hdl->rid(c), server_sock);
event_add_read(ctx->evb, pipe_read_id(pipe_term), NULL);

server_init = true;

Expand All @@ -280,8 +330,10 @@ core_server_teardown(void)
freeaddrinfo(server_ai);
buf_sock_return(&server_sock);
}
ring_array_destroy(conn_arr);
pipe_conn_destroy(&pipe_c);
ring_array_destroy(conn_term);
ring_array_destroy(conn_new);
pipe_conn_destroy(&pipe_term);
pipe_conn_destroy(&pipe_new);
server_metrics = NULL;
server_init = false;
}
Expand Down
6 changes: 4 additions & 2 deletions src/core/data/shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ struct pipe_conn;
struct ring_array;

/* pipe for server/worker thread communication */
extern struct pipe_conn *pipe_c;
extern struct pipe_conn *pipe_new;
extern struct pipe_conn *pipe_term;

/* array holding accepted connections */
extern struct ring_array *conn_arr;
extern struct ring_array *conn_new;
extern struct ring_array *conn_term;
Loading

0 comments on commit 78353d0

Please sign in to comment.