Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retryConnect() when using session pool #120

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ git clone https://github.com/vesoft-inc/nebula-cpp.git
### build

```bash
bash> cd nebula-clients/cpp && mkdir build && cd build
bash> cd nebula-cpp && mkdir build && cd build
bash> cmake ..
bash> make && sudo make install
```
Expand Down
13 changes: 10 additions & 3 deletions include/nebula/client/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <common/datatypes/DataSet.h>

#include <atomic>

#include "nebula/client/Connection.h"

namespace nebula {
Expand All @@ -25,14 +27,16 @@ class Session {
const std::string &username,
const std::string &password,
const std::string &timezoneName,
int32_t offsetSecs)
int32_t offsetSecs,
bool retryConnect)
: sessionId_(sessionId),
conn_(std::move(conn)),
pool_(pool),
username_(username),
password_(password),
timezoneName_(timezoneName),
offsetSecs_(offsetSecs) {}
offsetSecs_(offsetSecs),
retryConnect_(retryConnect) {}
Session(const Session &) = delete; // no copy
Session(Session &&session)
: sessionId_(session.sessionId_),
Expand All @@ -41,7 +45,8 @@ class Session {
username_(std::move(session.username_)),
password_(std::move(session.password_)),
timezoneName_(std::move(session.timezoneName_)),
offsetSecs_(session.offsetSecs_) {
offsetSecs_(session.offsetSecs_),
retryConnect_(session.retryConnect_) {
session.sessionId_ = -1;
session.pool_ = nullptr;
session.offsetSecs_ = 0;
Expand Down Expand Up @@ -117,6 +122,8 @@ class Session {
// empty means not a named timezone
std::string timezoneName_;
int32_t offsetSecs_;
bool retryConnect_{true};
std::atomic<bool> connectionIsBroken_{false};
};

} // namespace nebula
6 changes: 6 additions & 0 deletions src/client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ ExecutionResponse Connection::executeWithParameter(
ExecutionResponse resp;
try {
resp = client_->future_executeWithParameter(sessionId, stmt, parameters).get();
} catch (const apache::thrift::transport::TTransportException &ex) {
resp = ExecutionResponse{ErrorCode::E_FAIL_TO_CONNECT,
0,
nullptr,
nullptr,
std::make_unique<std::string>(ex.what())};
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
Expand Down
4 changes: 2 additions & 2 deletions src/client/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ void ConnectionPool::close() {
Session ConnectionPool::getSession(const std::string &username,
const std::string &password,
bool retryConnect) {
(void)retryConnect;
Connection conn = getConnection();
auto resp = conn.authenticate(username, password);
if (resp.errorCode != ErrorCode::SUCCEEDED || resp.sessionId == nullptr) {
Expand All @@ -55,7 +54,8 @@ Session ConnectionPool::getSession(const std::string &username,
username,
password,
*resp.timeZoneName,
*resp.timeZoneOffsetSeconds);
*resp.timeZoneOffsetSeconds,
retryConnect);
}

Connection ConnectionPool::getConnection() {
Expand Down
35 changes: 33 additions & 2 deletions src/client/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

#include "nebula/client/Session.h"

#include "common/graph/Response.h"
#include "common/time/TimeConversion.h"
#include "nebula/client/ConnectionPool.h"

namespace nebula {

ExecutionResponse Session::execute(const std::string &stmt) {
return ExecutionResponse(conn_.execute(sessionId_, stmt));
return executeWithParameter(stmt, {});
}

void Session::asyncExecute(const std::string &stmt, ExecuteCallback cb) {
Expand All @@ -22,7 +23,37 @@ void Session::asyncExecute(const std::string &stmt, ExecuteCallback cb) {

ExecutionResponse Session::executeWithParameter(
const std::string &stmt, const std::unordered_map<std::string, Value> &parameters) {
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
if (connectionIsBroken_ && retryConnect_) {
Shylock-Hg marked this conversation as resolved.
Show resolved Hide resolved
if (retryConnect() == nebula::ErrorCode::SUCCEEDED) {
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
} else {
return ExecutionResponse{ErrorCode::E_RPC_FAILURE,
0,
nullptr,
nullptr,
std::make_unique<std::string>("All servers are broken.")};
}
}
auto resp = conn_.executeWithParameter(sessionId_, stmt, parameters);
if (resp.errorCode == nebula::ErrorCode::SUCCEEDED) {
return resp;
} else if (resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT) {
connectionIsBroken_ = true;
if (retryConnect_) {
if (retryConnect() == nebula::ErrorCode::SUCCEEDED) {
connectionIsBroken_ = false;
return ExecutionResponse(conn_.executeWithParameter(sessionId_, stmt, parameters));
} else {
connectionIsBroken_ = true;
return ExecutionResponse{ErrorCode::E_RPC_FAILURE,
0,
nullptr,
nullptr,
std::make_unique<std::string>("All servers are broken.")};
}
}
}
return resp;
}

void Session::asyncExecuteWithParameter(const std::string &stmt,
Expand Down
17 changes: 1 addition & 16 deletions src/client/SessionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,7 @@ bool SessionPool::init() {
}

ExecutionResponse SessionPool::execute(const std::string &stmt) {
auto result = getIdleSession();
if (result.second) {
auto resp = result.first.execute(stmt);
if (resp.spaceName != nullptr && *resp.spaceName != config_.spaceName_) {
// switch to origin space
result.first.execute("USE " + config_.spaceName_);
}
giveBack(std::move(result.first));
return resp;
} else {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("No idle session.")};
}
return executeWithParameter(stmt, {});
}

ExecutionResponse SessionPool::executeWithParameter(
Expand Down