diff --git a/src/include/pool.h b/src/include/pool.h index 5ef3c2d3..f5e816a0 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -91,6 +91,13 @@ pgagroal_validation(void); void pgagroal_flush(int mode, char* database); +/** + * Flush the pool for a specific server + * @param server The server + */ +void +pgagroal_flush_server(signed char server); + /** * Prefill the pool * @param initial Use initial size diff --git a/src/libpgagroal/pool.c b/src/libpgagroal/pool.c index bd2e13f4..8909ce75 100644 --- a/src/libpgagroal/pool.c +++ b/src/libpgagroal/pool.c @@ -193,7 +193,7 @@ pgagroal_get_connection(char* username, char* database, bool reuse, bool transac if (!fork()) { - pgagroal_flush(FLUSH_GRACEFULLY, "*"); + pgagroal_flush_server(server); } if (config->failover) @@ -798,6 +798,68 @@ pgagroal_flush(int mode, char* database) exit(0); } +void +pgagroal_flush_server(signed char server) +{ + struct configuration* config; + + pgagroal_start_logging(); + pgagroal_memory_init(); + + config = (struct configuration*)shmem; + + pgagroal_log_debug("pgagroal_flush_server"); + for (int i = 0; i < config->max_connections; i++) + { + if (config->connections[i].server == server) + { + switch (atomic_load(&config->states[i])) + { + case STATE_NOTINIT: + case STATE_INIT: + /* Do nothing */ + break; + case STATE_FREE: + atomic_store(&config->states[i], STATE_GRACEFULLY); + if (pgagroal_socket_isvalid(config->connections[i].fd)) + { + pgagroal_write_terminate(NULL, config->connections[i].fd); + } + pgagroal_prometheus_connection_flush(); + pgagroal_tracking_event_slot(TRACKER_FLUSH, i); + pgagroal_kill_connection(i, NULL); + break; + case STATE_IN_USE: + case STATE_GRACEFULLY: + case STATE_FLUSH: + atomic_store(&config->states[i], STATE_GRACEFULLY); + break; + case STATE_IDLE_CHECK: + case STATE_VALIDATION: + case STATE_REMOVE: + atomic_store(&config->states[i], STATE_GRACEFULLY); + break; + default: + break; + } + } + } + + if (config->number_of_users > 0 && config->number_of_limits > 0) + { + if (!fork()) + { + pgagroal_prefill(false); + } + } + + pgagroal_pool_status(); + pgagroal_memory_destroy(); + pgagroal_stop_logging(); + + exit(0); +} + void pgagroal_prefill(bool initial) { diff --git a/src/libpgagroal/server.c b/src/libpgagroal/server.c index b6a45942..8ec7be29 100644 --- a/src/libpgagroal/server.c +++ b/src/libpgagroal/server.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -216,7 +217,8 @@ int pgagroal_server_failover(int slot) { signed char primary; - int old_primary; + signed char old_primary; + int ret = 1; struct configuration* config = NULL; config = (struct configuration*)shmem; @@ -227,10 +229,15 @@ pgagroal_server_failover(int slot) if (atomic_compare_exchange_strong(&config->servers[old_primary].state, &primary, SERVER_FAILOVER)) { - return failover(config->connections[slot].server); + ret = failover(old_primary); + + if (!fork()) + { + pgagroal_flush_server(old_primary); + } } - return 1; + return ret; } int diff --git a/src/main.c b/src/main.c index 12f5dc7e..36fdb438 100644 --- a/src/main.c +++ b/src/main.c @@ -1430,12 +1430,30 @@ accept_mgt_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) break; case MANAGEMENT_SWITCH_TO: pgagroal_log_debug("pgagroal: Management switch to"); + int old_primary = -1; + signed char server_state; + for (int i = 0; old_primary == -1 && i < config->number_of_servers; i++) + { + server_state = atomic_load(&config->servers[i].state); + if (server_state == SERVER_PRIMARY) + { + old_primary = i; + } + } + if (!pgagroal_server_switch(payload_s)) { if (!fork()) { shutdown_ports(); - pgagroal_flush(FLUSH_GRACEFULLY, "*"); + if (old_primary != -1) + { + pgagroal_flush_server(old_primary); + } + else + { + pgagroal_flush(FLUSH_GRACEFULLY, "*"); + } } pgagroal_prometheus_failed_servers(); }