Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
add min max connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Nov 1, 2023
1 parent 420b17f commit 53d9958
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 68 deletions.
2 changes: 0 additions & 2 deletions .cargo/config

This file was deleted.

8 changes: 7 additions & 1 deletion extension/include/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ class ConnectionPool
sql::mysql::MySQL_Driver *driver;
std::queue<sql::Connection*> connections = std::queue<sql::Connection*>();
std::mutex connectionsMutex;
int maxPoolSize;
int minPoolSize;
std::string host;
std::string username;
std::string password;

public:
ConnectionPool(int poolSize, const std::string& host, const std::string& username, const std::string& password);
ConnectionPool(int minPoolSize, int maxPoolSize, const std::string& host, const std::string& username, const std::string& password);
sql::Connection *createConnection(int retryLeftCount);
sql::Connection *getConnection();
void releaseConnection(sql::Connection *connection);
void close();
Expand Down
2 changes: 1 addition & 1 deletion extension/include/mysql_connection_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class MySQLConnectionManager {
static std::mutex mapMutex;

public:
static ConnectionPool* getConnectionPool(int poolSize, const std::string& host, const std::string& username, const std::string& password);
static ConnectionPool* getConnectionPool(int minPoolSize, int maxPoolSize, const std::string& host, const std::string& username, const std::string& password);
static void close(const std::string& host, const std::string& username, const std::string& password);
~MySQLConnectionManager();
};
2 changes: 1 addition & 1 deletion extension/src/duckdb_function/mysql_attach.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static void AttachFunction(ClientContext &context, TableFunctionInput &data_p, D
return;
}

gstate.pool = MySQLConnectionManager::getConnectionPool(5, data.host, data.username, data.password);
gstate.pool = MySQLConnectionManager::getConnectionPool(1, 5, data.host, data.username, data.password);
auto conn = gstate.pool->getConnection();

auto dconn = Connection(context.db->GetDatabase(context));
Expand Down
6 changes: 3 additions & 3 deletions extension/src/duckdb_function/mysql_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static unique_ptr<LocalTableFunctionState> MysqlInitLocalState(ExecutionContext

auto local_state = make_uniq<MysqlLocalState>();
local_state->column_ids = input.column_ids;
local_state->pool = MySQLConnectionManager::getConnectionPool(5, bind_data.host, bind_data.username, bind_data.password);
local_state->pool = MySQLConnectionManager::getConnectionPool(1, 5, bind_data.host, bind_data.username, bind_data.password);
local_state->conn = (local_state->pool)->getConnection();
local_state->filters = input.filters.get();

Expand Down Expand Up @@ -279,7 +279,7 @@ static std::tuple<vector<MysqlColumnInfo>, vector<string>, vector<LogicalType>,
// can't scan a table without columns (yes those exist)
if (res2->rowsCount() == 0)
{
throw InvalidInputException("Table %s does not contain any columns.", table_name);
throw InvalidInputException("Table %s.%s does not contain any columns OR does not exist in the DB.", schema_name, table_name);
}

// set the column types in a MysqlColumnInfo struct by iterating over the result set
Expand Down Expand Up @@ -356,7 +356,7 @@ static unique_ptr<FunctionData> MysqlBind(ClientContext &context, TableFunctionB
bind_data->schema_name = input.inputs[3].GetValue<string>();
bind_data->table_name = input.inputs[4].GetValue<string>();

auto connection_pool = MySQLConnectionManager::getConnectionPool(5, bind_data->host, bind_data->username, bind_data->password);
auto connection_pool = MySQLConnectionManager::getConnectionPool(1, 5, bind_data->host, bind_data->username, bind_data->password);

// // Create threads for concurrent execution
// std::thread t1(GetNumberOfShard, connection_pool, bind_data.get());
Expand Down
49 changes: 39 additions & 10 deletions extension/src/util/connection_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#include "duckdb.hpp"
#include "connection_pool.hpp"

ConnectionPool::ConnectionPool(int poolSize, const std::string &host, const std::string &username, const std::string &password)
ConnectionPool::ConnectionPool(int minPoolSize, int maxPoolSize, const std::string& host, const std::string& username, const std::string& password):
minPoolSize(minPoolSize), maxPoolSize(maxPoolSize), host(host), username(username), password(password)
{
// std::cout << "Creating connection pool with size " << poolSize << std::endl;

std::vector<std::thread> threads(poolSize);
std::vector<std::thread> threads(minPoolSize);
driver = sql::mysql::get_mysql_driver_instance();

for (int i = 0; i < poolSize; ++i)
for (int i = 0; i < minPoolSize; ++i)
{
threads[i] = std::thread([this, host, username, password]()
{
// std::cout << "Creating connection host " << host << " username " << username << " password " << password << std::endl;
try {
sql::Connection* connection = driver->connect(host, username, password);
// std::cout << "Connection created" << std::endl;
std::cout << "Connection created" << std::endl;
// Add a lock to ensure mutual exclusion when accessing the connections vector
std::lock_guard<std::mutex> lock(connectionsMutex);
connections.push(connection);
Expand All @@ -30,8 +32,6 @@ ConnectionPool::ConnectionPool(int poolSize, const std::string &host, const std:
});
}



// std::cout << "Waiting for threads to finish" << std::endl;
// Wait for all threads to finish
for (std::thread &thread : threads)
Expand All @@ -42,24 +42,53 @@ ConnectionPool::ConnectionPool(int poolSize, const std::string &host, const std:
//std::cout << "Threads finished" << std::endl;
}

sql::Connection *ConnectionPool::createConnection(int retryLeftCount) {
if(retryLeftCount == 0){
throw duckdb::InvalidInputException("Unable to create connection to the host %s with username %s", this->host, this->username);
} else {
try {
return this->driver->connect(this->host, this->username, this->password);
} catch (...) {
return createConnection(retryLeftCount - 1);
}
}
}

sql::Connection *ConnectionPool::getConnection()
{
// Add a lock to ensure mutual exclusion when accessing the connections vector
std::lock_guard<std::mutex> lock(connectionsMutex);
// std::cout << "Retrieving connection from pool" << std::endl;
std::cout << "Retrieving connection from pool" << std::endl;
// Retrieve the next available connection in a round-robin fashion

sql::Connection *connection = nullptr;

if(connections.empty()) {
std::cout << "Connection pool is empty" << std::endl;
connection = createConnection(3);
} else {
// front() returns a reference to the first element in the vector
sql::Connection *connection = connections.front();
connections.pop();
connection = connections.front();
connections.pop();
}

// check that the connection is still valid
if (connection->isValid()) {
// std::cout << "Connection is valid" << std::endl;
} else {
// std::cout << "Connection is invalid" << std::endl;
// if the connection is invalid, create a new one
delete connection;
connection = createConnection(3);
}
return connection;
}

void ConnectionPool::releaseConnection(sql::Connection *connection)
{
// Add a lock to ensure mutual exclusion when accessing the connections vector
std::lock_guard<std::mutex> lock(connectionsMutex);
// std::cout << "Releasing connection back to pool" << std::endl;
std::cout << "Releasing connection back to pool" << std::endl;
// Add the released connection back to the pool for reuse
connections.push(connection);
}
Expand Down
11 changes: 9 additions & 2 deletions extension/src/util/mysql_connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
std::map<std::tuple<std::string, std::string, std::string>, ConnectionPool *> MySQLConnectionManager::connectionMap;
std::mutex MySQLConnectionManager::mapMutex;

ConnectionPool *MySQLConnectionManager::getConnectionPool(int poolSize, const std::string &host, const std::string &username, const std::string &password)
ConnectionPool *MySQLConnectionManager::getConnectionPool(
int minPoolSize,
int maxPoolSize,
const std::string &host,
const std::string &username,
const std::string &password
)
{
// std::cout << "Retrieving connection pool" << std::endl;

std::lock_guard<std::mutex> lock(mapMutex);

auto key = std::make_tuple(host, username, password);
Expand All @@ -20,7 +27,7 @@ ConnectionPool *MySQLConnectionManager::getConnectionPool(int poolSize, const st

// std::cout << "Connection pool doesn't exist, create new!" << std::endl;
// ConnectionPool doesn't exist, create a new instance and add it to the map
ConnectionPool *connectionPool = new ConnectionPool(poolSize, host, username, password);
ConnectionPool *connectionPool = new ConnectionPool(minPoolSize, maxPoolSize, host, username, password);
connectionMap[key] = connectionPool;
return connectionPool;
}
Expand Down
48 changes: 0 additions & 48 deletions extension/src/util/synchronous_queue.cpp

This file was deleted.

0 comments on commit 53d9958

Please sign in to comment.