diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index cb88e69dc..3c58ac49a 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -39,8 +39,12 @@ jobs: rm re/libre.so || true rm re/libre.dylib || true make -C rem librem.a - cd retest; cmake -B build -DCMAKE_EXE_LINKER_FLAGS="--coverage" && \ - cmake --build build -j && ./build/retest -a + cd retest && \ + cmake -B build -DCMAKE_EXE_LINKER_FLAGS="--coverage" && \ + cmake --build build -j && \ + ./build/retest -a -v && \ + ./build/retest -r -m select -v && \ + ./build/retest -r -m poll -v - name: gcov run: | diff --git a/include/re_main.h b/include/re_main.h index d8539dd53..0600e8692 100644 --- a/include/re_main.h +++ b/include/re_main.h @@ -47,6 +47,7 @@ int re_debug(struct re_printf *pf, void *unused); int re_nfds(void); int re_alloc(struct re **rep); +void re_fhs_reuse_set(struct re *re, bool reuse); int re_thread_attach(struct re *re); void re_thread_detach(void); diff --git a/src/main/main.c b/src/main/main.c index 775f73e61..1fa81c8ff 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -69,6 +70,9 @@ enum { /** File descriptor handler struct */ struct fhs { + struct le he; /**< Hash entry */ + struct le le; /**< Cache/Delete entry */ + int index; /**< Index used for arrays */ re_sock_t fd; /**< File Descriptor */ int flags; /**< Polling flags (Read, Write, etc.) */ fd_h* fh; /**< Event handler */ @@ -77,8 +81,11 @@ struct fhs { /** Polling loop data */ struct re { - struct fhs *fhs; /** File descriptor handler set */ + struct hash *fhl; /**< File descriptor hash list */ + struct list fhs_delete; /**< File descriptor delete list */ + struct list fhs_cache; /**< File descriptor cache list */ int maxfds; /**< Maximum number of polling fds */ + int max_fd; /**< Maximum fd number */ int nfds; /**< Number of active file descriptors */ enum poll_method method; /**< The current polling method */ bool update; /**< File descriptor set need updating */ @@ -118,6 +125,10 @@ static void re_destructor(void *arg) poll_close(re); mem_deref(re->mutex); + hash_flush(re->fhl); + mem_deref(re->fhl); + list_flush(&re->fhs_delete); + list_flush(&re->fhs_cache); } @@ -153,6 +164,9 @@ int re_alloc(struct re **rep) re->mutexp = re->mutex; list_init(&re->tmrl); + list_init(&re->fhs_delete); + list_init(&re->fhs_cache); + re->tid = thrd_current(); #ifdef HAVE_EPOLL @@ -206,6 +220,18 @@ static struct re *re_get(void) } +static bool fhs_lookup(struct le *he, void *arg) +{ + re_sock_t fd = *(re_sock_t *)arg; + struct fhs *fhs = he->data; + + if (fd == fhs->fd) + return true; + + return false; +} + + static inline void re_lock(struct re *re) { int err; @@ -225,39 +251,6 @@ static inline void re_unlock(struct re *re) DEBUG_WARNING("re_unlock err\n"); } - -#ifdef WIN32 -/** - * This code emulates POSIX numbering. There is no locking, - * so zero thread-safety. - * - * @param re Poll state - * @param fd File descriptor - * - * @return fhs index if success, otherwise -1 - */ -static int lookup_fd_index(struct re* re, re_sock_t fd) { - int i; - - for (i = 0; i < re->nfds; i++) { - if (!re->fhs[i].fh) - continue; - - if (re->fhs[i].fd == fd) - return i; - } - - /* if nothing is found a linear search for the first - * zeroed handler */ - for (i = 0; i < re->maxfds; i++) { - if (!re->fhs[i].fh) - return i; - } - - return -1; -} -#endif - #if MAIN_DEBUG /** * Call the application event handler @@ -266,45 +259,55 @@ static int lookup_fd_index(struct re* re, re_sock_t fd) { * @param i File descriptor handler index * @param flags Event flags */ -static void fd_handler(struct re *re, int i, int flags) +static void fd_handler(struct fhs *fhs, int flags) { const uint64_t tick = tmr_jiffies(); uint32_t diff; DEBUG_INFO("event on fd=%d index=%d (flags=0x%02x)...\n", - re->fhs[i].fd, i, flags); + fhs->fd, i, flags); - re->fhs[i].fh(flags, re->fhs[i].arg); + fhs->fh(flags, fhs->arg); diff = (uint32_t)(tmr_jiffies() - tick); if (diff > MAX_BLOCKING) { DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n", diff, MAX_BLOCKING, - re->fhs[i].fh, re->fhs[i].arg); + fhs->fh, fhs->arg); } } #endif #ifdef HAVE_POLL -static int set_poll_fds(struct re *re, re_sock_t fd, int flags) +static int set_poll_fds(struct re *re, struct fhs *fhs) { if (!re->fds) return 0; + if (!fhs) + return EINVAL; + + re_sock_t fd = fhs->fd; + int flags = fhs->flags; + int index = fhs->index; + + if (index >= re->maxfds) + return EMFILE; + if (flags) - re->fds[fd].fd = fd; + re->fds[index].fd = fd; else - re->fds[fd].fd = -1; + re->fds[index].fd = -1; - re->fds[fd].events = 0; + re->fds[index].events = 0; if (flags & FD_READ) - re->fds[fd].events |= POLLIN; + re->fds[index].events |= POLLIN; if (flags & FD_WRITE) - re->fds[fd].events |= POLLOUT; + re->fds[index].events |= POLLOUT; if (flags & FD_EXCEPT) - re->fds[fd].events |= POLLERR; + re->fds[index].events |= POLLERR; return 0; } @@ -312,11 +315,17 @@ static int set_poll_fds(struct re *re, re_sock_t fd, int flags) #ifdef HAVE_EPOLL -static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) +static int set_epoll_fds(struct re *re, struct fhs *fhs) { struct epoll_event event; int err = 0; + if (!fhs) + return EINVAL; + + re_sock_t fd = fhs->fd; + int flags = fhs->flags; + if (re->epfd < 0) return EBADFD; @@ -325,7 +334,7 @@ static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) DEBUG_INFO("set_epoll_fds: fd=%d flags=0x%02x\n", fd, flags); if (flags) { - event.data.fd = fd; + event.data.ptr = fhs; if (flags & FD_READ) event.events |= EPOLLIN; @@ -371,11 +380,17 @@ static int set_epoll_fds(struct re *re, re_sock_t fd, int flags) #ifdef HAVE_KQUEUE -static int set_kqueue_fds(struct re *re, re_sock_t fd, int flags) +static int set_kqueue_fds(struct re *re, struct fhs *fhs) { struct kevent kev[2]; int r, n = 0; + if (!fhs) + return EINVAL; + + re_sock_t fd = fhs->fd; + int flags = fhs->flags; + memset(kev, 0, sizeof(kev)); /* always delete the events */ @@ -386,11 +401,11 @@ static int set_kqueue_fds(struct re *re, re_sock_t fd, int flags) memset(kev, 0, sizeof(kev)); if (flags & FD_WRITE) { - EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0); + EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, fhs); ++n; } if (flags & FD_READ) { - EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0); + EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, fhs); ++n; } @@ -411,53 +426,56 @@ static int set_kqueue_fds(struct re *re, re_sock_t fd, int flags) /** - * Rebuild the file descriptor mapping table. This must be done whenever + * Rebuild the file descriptor mapping. This must be done whenever * the polling method is changed. */ -static int rebuild_fds(struct re *re) +static bool rebuild_fd(struct le *he, void *arg) { - int i, err = 0; + int err = 0; + struct re *re = arg; + struct fhs *fhs = he->data; - DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds); /* Update fd sets */ - for (i=0; infds; i++) { - if (!re->fhs[i].fh) - continue; + if (!fhs->fh) + return false; - switch (re->method) { + switch (re->method) { #ifdef HAVE_POLL - case METHOD_POLL: - err = set_poll_fds(re, i, re->fhs[i].flags); - break; + case METHOD_POLL: + err = set_poll_fds(re, fhs); + break; #endif #ifdef HAVE_EPOLL - case METHOD_EPOLL: - err = set_epoll_fds(re, i, re->fhs[i].flags); - break; + case METHOD_EPOLL: + err = set_epoll_fds(re, fhs); + break; #endif #ifdef HAVE_KQUEUE - case METHOD_KQUEUE: - err = set_kqueue_fds(re, i, re->fhs[i].flags); - break; + case METHOD_KQUEUE: + err = set_kqueue_fds(re, fhs); + break; #endif - default: - break; - } + default: + break; + } - if (err) - break; + if (err) { + DEBUG_WARNING("rebuild_fd: set fd error: %m\n", err); + return true; } - return err; + return false; } static int poll_init(struct re *re) { + int err; + DEBUG_INFO("poll init (maxfds=%d)\n", re->maxfds); if (!re->maxfds) { @@ -465,6 +483,12 @@ static int poll_init(struct re *re) return EINVAL; } + if (!re->fhl) { + err = hash_alloc(&re->fhl, re->maxfds); + if (err) + return err; + } + switch (re->method) { #ifdef HAVE_POLL @@ -490,8 +514,7 @@ static int poll_init(struct re *re) if (re->epfd < 0 && -1 == (re->epfd = epoll_create(re->maxfds))) { - - int err = errno; + err = errno; DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n", err, re->maxfds); @@ -536,7 +559,6 @@ static void poll_close(struct re *re) DEBUG_INFO("poll close\n"); - re->fhs = mem_deref(re->fhs); re->maxfds = 0; #ifdef HAVE_POLL @@ -591,6 +613,58 @@ static int poll_setup(struct re *re) } +static void fhs_destroy(void *data) +{ + struct fhs *fhs = data; + + hash_unlink(&fhs->he); +} + + +static int fhs_update(struct re *re, struct fhs **fhsp, re_sock_t fd, + int flags, fd_h *fh, void *arg) +{ + struct fhs *fhs = NULL; + struct le *he = hash_lookup(re->fhl, + hash_fast_murmur((uint32_t)fd, re->maxfds), + fhs_lookup, &fd); + + if (he) { + fhs = he->data; + } + else if (!list_isempty(&re->fhs_cache)) { + struct le *le = list_head(&re->fhs_cache); + + fhs = le->data; + list_unlink(le); + } + else { + fhs = mem_zalloc(sizeof(struct fhs), fhs_destroy); + if (!fhs) + return ENOMEM; + + fhs->index = -1; + } + + if (fhs->index == -1) + fhs->index = ++re->nfds - 1; + + fhs->fd = fd; + fhs->flags = flags; + fhs->fh = fh; + fhs->arg = arg; + + if (!he) + hash_append(re->fhl, + hash_fast_murmur((uint32_t)fd, re->maxfds), + &fhs->he, fhs); + + *fhsp = fhs; + + return 0; +} + + /** * Listen for events on a file descriptor * @@ -604,8 +678,8 @@ static int poll_setup(struct re *re) int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) { struct re *re = re_get(); + struct fhs *fhs = NULL; int err = 0; - int i; if (!re) { DEBUG_WARNING("fd_listen: re not ready\n"); @@ -631,41 +705,26 @@ int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) return err; } + err = fhs_update(re, &fhs, fd, flags, fh, arg); + if (err) + return err; + + switch (re->method) { +#ifdef HAVE_SELECT + case METHOD_SELECT: #ifdef WIN32 - /* Windows file descriptors do not follow POSIX standard ranges. */ - i = lookup_fd_index(re, fd); - if (i < 0) { - DEBUG_WARNING("fd_listen: fd=%d - no free fd_index\n", fd); - return EMFILE; - } + if (re->nfds >= DEFAULT_MAXFDS) + err = EMFILE; #else - i = fd; + if (fd + 1 >= DEFAULT_MAXFDS) + err = EMFILE; +#endif + break; #endif - - if (i >= re->maxfds) { - if (flags) { - DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x" - " - Max %d fds\n", - fd, flags, re->maxfds); - } - return EMFILE; - } - - /* Update fh set */ - if (re->fhs) { - re->fhs[i].fd = fd; - re->fhs[i].flags = flags; - re->fhs[i].fh = fh; - re->fhs[i].arg = arg; - } - - re->nfds = max(re->nfds, i+1); - - switch (re->method) { #ifdef HAVE_POLL case METHOD_POLL: - err = set_poll_fds(re, fd, flags); + err = set_poll_fds(re, fhs); break; #endif @@ -673,13 +732,13 @@ int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) case METHOD_EPOLL: if (re->epfd < 0) return EBADFD; - err = set_epoll_fds(re, fd, flags); + err = set_epoll_fds(re, fhs); break; #endif #ifdef HAVE_KQUEUE case METHOD_KQUEUE: - err = set_kqueue_fds(re, fd, flags); + err = set_kqueue_fds(re, fhs); break; #endif @@ -687,12 +746,26 @@ int fd_listen(re_sock_t fd, int flags, fd_h *fh, void *arg) break; } - if (err) { - if (flags && fh) { - fd_close(fd); - DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n", - fd, flags, err); - } +#ifndef WIN32 + if (!err) + re->max_fd = max(re->max_fd, fd + 1); +#endif + + /* Stop listening */ + if (!flags) { + fhs->index = -1; + if (re_atomic_rlx(&re->polling)) + list_append(&re->fhs_delete, &fhs->le, fhs); + else + list_append(&re->fhs_cache, &fhs->le, fhs); + hash_unlink(&fhs->he); + --re->nfds; + } + + if (err && flags) { + fd_close(fd); + DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x (%m)\n", fd, + flags, err); } return err; @@ -720,9 +793,15 @@ void fd_close(re_sock_t fd) static int fd_poll(struct re *re) { const uint64_t to = tmr_next_timeout(&re->tmrl); - int i, n, index; + int n; + struct le *he; + struct le *le; + int nfds = re->nfds; #ifdef HAVE_SELECT fd_set rfds, wfds, efds; +#ifdef WIN32 + re_sock_t sfds[DEFAULT_MAXFDS]; +#endif #endif DEBUG_INFO("next timer: %llu ms\n", to); @@ -746,19 +825,32 @@ static int fd_poll(struct re *re) FD_ZERO(&wfds); FD_ZERO(&efds); - for (i=0; infds; i++) { - re_sock_t fd = re->fhs[i].fd; - if (!re->fhs[i].fh) - continue; + uint32_t bsize = hash_bsize(re->fhl); + for (uint32_t i = 0; i < bsize; i++) { + LIST_FOREACH(hash_list_idx(re->fhl, i), he) + { + struct fhs *fhs = he->data; - if (re->fhs[i].flags & FD_READ) - FD_SET(fd, &rfds); - if (re->fhs[i].flags & FD_WRITE) - FD_SET(fd, &wfds); - if (re->fhs[i].flags & FD_EXCEPT) - FD_SET(fd, &efds); + if (!fhs->flags) + continue; +#ifdef WIN32 + if (fhs->index < DEFAULT_MAXFDS) + sfds[fhs->index] = fhs->fd; + else + return EMFILE; +#endif + + if (fhs->flags & FD_READ) + FD_SET(fhs->fd, &rfds); + if (fhs->flags & FD_WRITE) + FD_SET(fhs->fd, &wfds); + if (fhs->flags & FD_EXCEPT) + FD_SET(fhs->fd, &efds); + } } + if (re->max_fd) + nfds = re->max_fd; #ifdef WIN32 tv.tv_sec = (long) to / 1000; #else @@ -766,7 +858,7 @@ static int fd_poll(struct re *re) #endif tv.tv_usec = (uint32_t) (to % 1000) * 1000; re_unlock(re); - n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL); + n = select(nfds, &rfds, &wfds, &efds, to ? &tv : NULL); re_lock(re); } break; @@ -804,47 +896,52 @@ static int fd_poll(struct re *re) if (n < 0) return ERRNO_SOCK; - /* Check for events */ - for (i=0; (n > 0) && (i < re->nfds); i++) { + for (int i = 0; (n > 0) && (i < nfds); i++) { re_sock_t fd; + struct fhs *fhs = NULL; int flags = 0; switch (re->method) { - +#ifdef HAVE_SELECT + case METHOD_SELECT: +#ifdef WIN32 + fd = sfds[i]; +#else + fd = i; +#endif + if (FD_ISSET(fd, &rfds)) + flags |= FD_READ; + if (FD_ISSET(fd, &wfds)) + flags |= FD_WRITE; + if (FD_ISSET(fd, &efds)) + flags |= FD_EXCEPT; + break; +#endif #ifdef HAVE_POLL case METHOD_POLL: - fd = i; - if (re->fds[fd].revents & POLLIN) + fd = re->fds[i].fd; + + if (re->fds[i].revents & POLLIN) flags |= FD_READ; - if (re->fds[fd].revents & POLLOUT) + if (re->fds[i].revents & POLLOUT) flags |= FD_WRITE; - if (re->fds[fd].revents & (POLLERR|POLLHUP|POLLNVAL)) + if (re->fds[i].revents & (POLLERR|POLLHUP|POLLNVAL)) flags |= FD_EXCEPT; - if (re->fds[fd].revents & POLLNVAL) { + if (re->fds[i].revents & POLLNVAL) { DEBUG_WARNING("event: fd=%d POLLNVAL" " (fds.fd=%d," " fds.events=0x%02x)\n", - fd, re->fds[fd].fd, - re->fds[fd].events); + fd, re->fds[i].fd, + re->fds[i].events); } /* Clear events */ - re->fds[fd].revents = 0; - break; -#endif -#ifdef HAVE_SELECT - case METHOD_SELECT: - fd = re->fhs[i].fd; - if (FD_ISSET(fd, &rfds)) - flags |= FD_READ; - if (FD_ISSET(fd, &wfds)) - flags |= FD_WRITE; - if (FD_ISSET(fd, &efds)) - flags |= FD_EXCEPT; + re->fds[i].revents = 0; break; #endif #ifdef HAVE_EPOLL case METHOD_EPOLL: - fd = re->events[i].data.fd; + fhs = re->events[i].data.ptr; + fd = fhs->fd; if (re->events[i].events & EPOLLIN) flags |= FD_READ; @@ -866,6 +963,7 @@ static int fd_poll(struct re *re) struct kevent *kev = &re->evlist[i]; fd = (int)kev->ident; + fhs = kev->udata; if (fd >= re->maxfds) { DEBUG_WARNING("large fd=%d\n", fd); @@ -903,17 +1001,24 @@ static int fd_poll(struct re *re) if (!flags) continue; -#ifdef WIN32 - index = i; -#else - index = fd; -#endif - if (re->fhs[index].fh) { + if (!fhs) { + he = hash_lookup( + re->fhl, + hash_fast_murmur((uint32_t)fd, re->maxfds), + fhs_lookup, &fd); + if (!he) { + DEBUG_WARNING("hash_lookup err fd=%d\n", fd); + continue; + } + fhs = he->data; + } + + if (fhs->fh && fhs->index >= 0) { #if MAIN_DEBUG - fd_handler(re, index, flags); + fd_handler(fhs, flags); #else - re->fhs[index].fh(flags, re->fhs[index].arg); + fhs->fh(flags, fhs->arg); #endif } @@ -927,6 +1032,15 @@ static int fd_poll(struct re *re) --n; } + /* Delayed fhs_cache move to avoid stale fhs pointers and allow fhs + * reuse */ + LIST_FOREACH(&re->fhs_delete, le) + { + struct fhs *fhs = le->data; + list_unlink(le); + list_append(&re->fhs_cache, &fhs->le, fhs); + } + return 0; } @@ -934,7 +1048,7 @@ static int fd_poll(struct re *re) /** * Set the maximum number of file descriptors * - * @note Only first call inits maxfds and fhs, so call after libre_init() and + * @note Only first call inits maxfds, so call after libre_init() and * before re_main() in custom applications. * * @param maxfds Max FDs. 0 to free and -1 for RLIMIT_NOFILE (Linux/Unix only) @@ -976,16 +1090,7 @@ int fd_setsize(int maxfds) #endif if (!re->maxfds) - re->maxfds = maxfds; - - if (!re->fhs) { - DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n", - re->maxfds, re->maxfds * sizeof(*re->fhs)); - - re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL); - if (!re->fhs) - return ENOMEM; - } + re->maxfds = hash_valid_size(maxfds); return 0; } @@ -997,25 +1102,30 @@ int fd_setsize(int maxfds) void fd_debug(void) { const struct re *re = re_get(); - int i; + struct le *he; if (!re) { DEBUG_WARNING("fd_debug: re not ready\n"); return; } - if (!re->fhs) + if (!re->fhl) return; - for (i=0; infds; i++) { + uint32_t bsize = hash_bsize(re->fhl); + for (uint32_t i = 0; i < bsize; i++) { + LIST_FOREACH(hash_list_idx(re->fhl, i), he) + { + struct fhs *fhs = he->data; - if (!re->fhs[i].flags) - continue; + if (!fhs->flags) + continue; - (void)re_fprintf(stderr, - "fd %d in use: flags=%x fh=%p arg=%p\n", - i, re->fhs[i].flags, re->fhs[i].fh, - re->fhs[i].arg); + (void)re_fprintf( + stderr, + "fd %d in use: flags=%x fh=%p arg=%p\n", + fhs->fd, fhs->flags, fhs->fh, fhs->arg); + } } } @@ -1207,6 +1317,7 @@ enum poll_method poll_method_get(void) int poll_method_set(enum poll_method method) { struct re *re = re_get(); + struct le *le; int err; if (!re) { @@ -1226,10 +1337,19 @@ int poll_method_set(enum poll_method method) #endif #ifdef HAVE_SELECT case METHOD_SELECT: - if (re->maxfds > (int)FD_SETSIZE) { - DEBUG_WARNING("SELECT: maxfds > FD_SETSIZE\n"); +#ifdef WIN32 + if (re->nfds > (int)DEFAULT_MAXFDS) { + DEBUG_WARNING("poll_method_set: can not use SELECT " + "max. FDs are reached\n"); + return EMFILE; + } +#else + if (re->max_fd > (int)DEFAULT_MAXFDS) { + DEBUG_WARNING("poll_method_set: can not use SELECT " + "max. FDs are reached\n"); return EMFILE; } +#endif break; #endif #ifdef HAVE_EPOLL @@ -1256,7 +1376,12 @@ int poll_method_set(enum poll_method method) if (err) return err; - return rebuild_fds(re); + DEBUG_INFO("rebuilding fds (nfds=%d)\n", re->nfds); + le = hash_apply(re->fhl, rebuild_fd, re); + if (le) + return EBADF; + + return 0; }