Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handling with IO error #123

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/nebula/client/SessionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ struct SessionPoolConfig {
std::string password_;
std::vector<std::string> addrs_; // the list of graph addresses
std::string spaceName_;
// Socket timeout and Socket connection timeout, unit: seconds
// Socket timeout and Socket connection timeout, unit: milliseconds
std::uint32_t timeout_{0};
// The idleTime of the connection, unit: seconds
// The idleTime of the connection, unit: milliseconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
std::uint32_t idleTime_{0};
Expand Down
44 changes: 19 additions & 25 deletions src/client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,7 @@ AuthResponse Connection::authenticate(const std::string &user, const std::string
}

ExecutionResponse Connection::execute(int64_t sessionId, const std::string &stmt) {
if (client_ == nullptr) {
return ExecutionResponse{ErrorCode::E_DISCONNECTED,
0,
nullptr,
nullptr,
std::make_unique<std::string>("Not open connection.")};
}

ExecutionResponse resp;
try {
resp = client_->future_execute(sessionId, stmt).get();
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
}

return resp;
return executeWithParameter(sessionId, stmt, {});
}

void Connection::asyncExecute(int64_t sessionId, const std::string &stmt, ExecuteCallback cb) {
Expand Down Expand Up @@ -188,15 +172,26 @@ ExecutionResponse Connection::executeWithParameter(
std::make_unique<std::string>("Not open connection.")};
}

using TTransportException = apache::thrift::transport::TTransportException;
ExecutionResponse resp;
try {
resp = client_->future_executeWithParameter(sessionId, stmt, parameters).get();
} catch (const apache::thrift::transport::TTransportException &ex) {
resp = ExecutionResponse{ErrorCode::E_FAIL_TO_CONNECT,
0,
nullptr,
nullptr,
std::make_unique<std::string>(ex.what())};
} catch (const TTransportException &ex) {
auto errType = ex.getType();
std::string errMsg = ex.what();
if (errType == TTransportException::END_OF_FILE ||
(errType == TTransportException::INTERNAL_ERROR &&
errMsg.find("Connection reset by peer") != std::string::npos)) {
resp = ExecutionResponse{
ErrorCode::E_FAIL_TO_CONNECT, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};
} else if (errType == TTransportException::TIMED_OUT) {
resp = ExecutionResponse{
ErrorCode::E_SESSION_TIMEOUT, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};

} else {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(errMsg)};
}
} catch (const std::exception &ex) {
resp = ExecutionResponse{
ErrorCode::E_RPC_FAILURE, 0, nullptr, nullptr, std::make_unique<std::string>(ex.what())};
Expand Down Expand Up @@ -296,8 +291,7 @@ void Connection::close() {

bool Connection::ping() {
auto resp = execute(0 /*Only check connection*/, "YIELD 1");
if (resp.errorCode == ErrorCode::E_RPC_FAILURE ||
resp.errorCode == ErrorCode::E_FAIL_TO_CONNECT ||
if (resp.errorCode == ErrorCode::E_FAIL_TO_CONNECT ||
resp.errorCode == ErrorCode::E_DISCONNECTED) {
DLOG(ERROR) << "Ping failed: " << *resp.errorMsg;
return false;
Expand Down
4 changes: 1 addition & 3 deletions src/client/tests/ConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ TEST_F(ConnectionTest, Timeout) {
// execute
resp = c.execute(*authResp.sessionId,
"use conn_test;GO 100000 STEPS FROM 'Tim Duncan' OVER like YIELD like._dst;");
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_RPC_FAILURE ||
resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT)
<< *resp.errorMsg;
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_SESSION_TIMEOUT) << *resp.errorMsg;

resp =
c.execute(*authResp.sessionId,
Expand Down
4 changes: 1 addition & 3 deletions src/client/tests/SessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ TEST_F(SessionTest, Timeout) {
// execute
resp = session.execute(
"use session_test;GO 100000 STEPS FROM 'Tim Duncan' OVER like YIELD like._dst;");
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_FAIL_TO_CONNECT ||
resp.errorCode == nebula::ErrorCode::E_RPC_FAILURE)
<< *resp.errorMsg;
ASSERT_TRUE(resp.errorCode == nebula::ErrorCode::E_SESSION_TIMEOUT) << *resp.errorMsg;

resp = session.execute(
"SHOW QUERIES "
Expand Down