Skip to content

Commit

Permalink
use recvmmsg() when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
tony2001 committed Jun 25, 2015
1 parent 156ee72 commit 607afe9
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 5 deletions.
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ AC_CHECK_HEADERS(limits.h syslimits.h string.h strings.h unistd.h stdint.h)
AC_PROG_SED
AC_PROG_AWK

AC_CHECK_FUNCS([strndup sysconf])
AC_CHECK_FUNCS([strndup sysconf recvmmsg])

AX_CONFIG_NICE([config.nice])

Expand Down
92 changes: 88 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1351,14 +1351,97 @@ char *pinba_error_ex(int return_error, int type, const char *file, int line, con
}
/* }}} */

#if PINBA_ENGINE_HAVE_RECVMMSG
#define PINBA_VLEN 64
void pinba_eat_udp(pinba_socket *sock) /* {{{ */
{
int i;
struct mmsghdr msgs[PINBA_VLEN];
struct iovec iovecs[PINBA_VLEN];
char bufs[PINBA_VLEN][PINBA_UDP_BUFFER_SIZE];

for (i = 0; i < PINBA_VLEN; i++) {
iovecs[i].iov_base = bufs[i];
iovecs[i].iov_len = PINBA_UDP_BUFFER_SIZE;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}

for (;;) {
int num;

num = recvmmsg(sock->listen_sock, msgs, PINBA_VLEN, 0, NULL);

if (num > 0) {
pinba_data_bucket *bucket;
pinba_pool *data_pool;

pthread_rwlock_wrlock(&D->data_lock);
data_pool = &D->data_pool[D->data_pool_num];

if (UNLIKELY((data_pool->size - data_pool->in) <= num)) {
size_t new_size = data_pool->size * 2;
if (new_size > D->settings.temp_pool_size_limit) {
new_size = D->settings.temp_pool_size_limit;
}

if (new_size > data_pool->size) {
pinba_warning("growing data_pool to new size: %ld", data_pool->size);
if (pinba_pool_grow(data_pool, new_size - data_pool->size) != P_SUCCESS) {
pthread_rwlock_unlock(&D->data_lock);
pinba_error(P_ERROR, "out of memory");
continue;
}
} else {
pinba_warning("failed to grow data pool: we've reached the size limit of %ld", D->settings.temp_pool_size_limit);
}
}

if (UNLIKELY((data_pool->size - data_pool->in) <= num)) {
/* the pool is still full, can't do anything about it =( */
pthread_rwlock_unlock(&D->data_lock);
continue;
} else {
for (i = 0; i < num; i++) {
if (msgs[i].msg_len > 0) {
bucket = DATA_POOL(data_pool) + data_pool->in;
bucket->len = 0;
if (bucket->alloc_len < msgs[i].msg_len) {
bucket->buf = (char *)realloc(bucket->buf, msgs[i].msg_len);
bucket->alloc_len = msgs[i].msg_len;
}

if (UNLIKELY(!bucket->buf)) {
/* OUT OF MEM */
bucket->alloc_len = 0;
} else {
memcpy(bucket->buf, bufs[i], msgs[i].msg_len);
bucket->len = msgs[i].msg_len;

data_pool->in++;
}
}
}
}
pthread_rwlock_unlock(&D->data_lock);
} else if (num < 0) {
if (errno == EINTR) {
continue;
}
pinba_error(P_WARNING, "recvmmsg() failed: %s (%d)", strerror(errno), errno);
} else {
pinba_error(P_WARNING, "recvmmsg() returned 0");
}
}
}
/* }}} */
#else
void pinba_eat_udp(pinba_socket *sock) /* {{{ */
{
for (;;) {
int ret;
unsigned char buf[PINBA_UDP_BUFFER_SIZE];

start:

ret = recv(sock->listen_sock, buf, PINBA_UDP_BUFFER_SIZE, 0);

if (ret > 0) {
Expand All @@ -1379,7 +1462,7 @@ void pinba_eat_udp(pinba_socket *sock) /* {{{ */
if (pinba_pool_grow(data_pool, new_size - data_pool->size) != P_SUCCESS) {
pthread_rwlock_unlock(&D->data_lock);
pinba_error(P_ERROR, "out of memory");
goto start;
continue;
}
} else {
pinba_warning("failed to grow data pool: we've reached the size limit of %ld", D->settings.temp_pool_size_limit);
Expand All @@ -1389,7 +1472,7 @@ void pinba_eat_udp(pinba_socket *sock) /* {{{ */
if (UNLIKELY(data_pool->in == (data_pool->size - 1))) {
/* the pool is still full, can't do anything about it =( */
pthread_rwlock_unlock(&D->data_lock);
goto start;
continue;
} else {
bucket = DATA_POOL(data_pool) + data_pool->in;
bucket->len = 0;
Expand Down Expand Up @@ -1420,6 +1503,7 @@ void pinba_eat_udp(pinba_socket *sock) /* {{{ */
}
}
/* }}} */
#endif

void pinba_socket_free(pinba_socket *socket) /* {{{ */
{
Expand Down

0 comments on commit 607afe9

Please sign in to comment.