Skip to content

Commit

Permalink
add retryConnect() when using session pool
Browse files Browse the repository at this point in the history
  • Loading branch information
songqing committed Aug 18, 2023
1 parent aef6d20 commit ff7d26e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 23 deletions.
13 changes: 10 additions & 3 deletions include/nebula/client/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <common/datatypes/DataSet.h>

#include <atomic>

#include "nebula/client/Connection.h"

namespace nebula {
Expand All @@ -25,14 +27,16 @@ class Session {
const std::string &username,
const std::string &password,
const std::string &timezoneName,
int32_t offsetSecs)
int32_t offsetSecs,
bool retryConnect)
: sessionId_(sessionId),
conn_(std::move(conn)),
pool_(pool),
username_(username),
password_(password),
timezoneName_(timezoneName),
offsetSecs_(offsetSecs) {}
offsetSecs_(offsetSecs),
retryConnect_(retryConnect) {}
Session(const Session &) = delete; // no copy
Session(Session &&session)
: sessionId_(session.sessionId_),
Expand All @@ -41,7 +45,8 @@ class Session {
username_(std::move(session.username_)),
password_(std::move(session.password_)),
timezoneName_(std::move(session.timezoneName_)),
offsetSecs_(session.offsetSecs_) {
offsetSecs_(session.offsetSecs_),
retryConnect_(session.retryConnect_) {
session.sessionId_ = -1;
session.pool_ = nullptr;
session.offsetSecs_ = 0;
Expand Down Expand Up @@ -117,6 +122,8 @@ class Session {
// empty means not a named timezone
std::string timezoneName_;
int32_t offsetSecs_;
bool retryConnect_{true};
std::atomic<bool> connectionIsBroken_{false};
};

} // namespace nebula
6 changes: 6 additions & 0 deletions src/client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ ExecutionResponse Connection::executeWithParameter(
ExecutionResponse resp;
try {
resp = client_->future_executeWithParameter(sessionId, stmt, parameters).get();
} catch (const apache::thrift::transport::TTransportException &ex) {
resp = ExecutionResponse{ErrorCode::E_FAIL_TO_CONNECT,
0,
nullptr,
nullptr,
std::make_unique<std::string>(ex.what())};
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
Expand Down
4 changes: 2 additions & 2 deletions src/client/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ void ConnectionPool::close() {
Session ConnectionPool::getSession(const std::string &username,
const std::string &password,
bool retryConnect) {
(void)retryConnect;
Connection conn = getConnection();
auto resp = conn.authenticate(username, password);
if (resp.errorCode != ErrorCode::SUCCEEDED || resp.sessionId == nullptr) {
Expand All @@ -55,7 +54,8 @@ Session ConnectionPool::getSession(const std::string &username,
username,
password,
*resp.timeZoneName,
*resp.timeZoneOffsetSeconds);
*resp.timeZoneOffsetSeconds,
retryConnect);
}

Connection ConnectionPool::getConnection() {
Expand Down
35 changes: 33 additions & 2 deletions src/client/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

#include "nebula/client/Session.h"

#include "common/graph/Response.h"
#include "common/time/TimeConversion.h"
#include "nebula/client/ConnectionPool.h"

namespace nebula {

ExecutionResponse Session::execute(const std::string &stmt) {
return ExecutionResponse(conn_.execute(sessionId_, stmt));
return executeWithParameter(stmt, {});
}

void Session::asyncExecute(const std::string &stmt, ExecuteCallback cb) {
Expand All @@ -22,7 +23,37 @@ void Session::asyncExecute(const std::string &stmt, ExecuteCallback cb) {

ExecutionResponse Session::executeWithParameter(
const std::string &stmt, const std::unordered_map<std::string, Value> &parameters) {
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
if (connectionIsBroken_ && retryConnect_) {
if (retryConnect() == nebula::ErrorCode::SUCCEEDED) {
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
} else {
return ExecutionResponse{ErrorCode::E_RPC_FAILURE,
0,
nullptr,
nullptr,
std::make_unique<std::string>("All servers are broken.")};
}
}
auto resp = conn_.executeWithParameter(sessionId_, stmt, parameters);
if (resp.errorCode == nebula::ErrorCode::SUCCEEDED) {
return resp;
} else if (resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT) {
connectionIsBroken_ = true;
if (retryConnect_) {
if (retryConnect() == nebula::ErrorCode::SUCCEEDED) {
connectionIsBroken_ = false;
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
} else {
connectionIsBroken_ = true;
return ExecutionResponse{ErrorCode::E_RPC_FAILURE,
0,
nullptr,
nullptr,
std::make_unique<std::string>("All servers are broken.")};
}
}
}
return resp;
}

void Session::asyncExecuteWithParameter(const std::string &stmt,
Expand Down
18 changes: 2 additions & 16 deletions src/client/SessionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "nebula/client/SessionPool.h"

#include <folly/json.h>
#include <glog/logging.h>

#include "common/time/TimeConversion.h"
#include "nebula/client/ConnectionPool.h"
Expand Down Expand Up @@ -39,22 +40,7 @@ bool SessionPool::init() {
}

ExecutionResponse SessionPool::execute(const std::string &stmt) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.execute(stmt);
if (resp.spaceName != nullptr && *resp.spaceName != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("No idle session.")};
}
return executeWithParameter(stmt, {});
}

ExecutionResponse SessionPool::executeWithParameter(
Expand Down

0 comments on commit ff7d26e

Please sign in to comment.