Skip to content

Commit

Permalink
Fix RPC TIMEOUT error
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Mar 17, 2022
1 parent caddc4d commit 13eaa86
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 118 deletions.
6 changes: 3 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->semifuture_chainUpdateEdge(r);
return client->future_chainUpdateEdge(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down Expand Up @@ -106,7 +106,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->semifuture_chainAddEdges(r);
return client->future_chainAddEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down Expand Up @@ -163,7 +163,7 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->semifuture_chainDeleteEdges(r);
return client->future_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down
40 changes: 20 additions & 20 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) {
return client->semifuture_getNeighbors(r);
return client->future_getNeighbors(r);
});
}

Expand Down Expand Up @@ -142,7 +142,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::AddVerticesRequest& r) {
return client->semifuture_addVertices(r);
return client->future_addVertices(r);
});
}

Expand Down Expand Up @@ -180,8 +180,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonReq
std::move(requests),
[useToss = param.useExperimentalFeature](ThriftClientType* client,
const cpp2::AddEdgesRequest& r) {
return useToss ? client->semifuture_chainAddEdges(r)
: client->semifuture_addEdges(r);
return useToss ? client->future_chainAddEdges(r)
: client->future_addEdges(r);
});
}

Expand Down Expand Up @@ -237,7 +237,7 @@ StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
return client->semifuture_getProps(r);
return client->future_getProps(r);
});
}

Expand Down Expand Up @@ -270,8 +270,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(
std::move(requests),
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->semifuture_chainDeleteEdges(r)
: client->semifuture_deleteEdges(r);
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
});
}

Expand Down Expand Up @@ -303,7 +303,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) {
return client->semifuture_deleteVertices(r);
return client->future_deleteVertices(r);
});
}

Expand Down Expand Up @@ -335,7 +335,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteTags(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) {
return client->semifuture_deleteTags(r);
return client->future_deleteTags(r);
});
}

Expand Down Expand Up @@ -385,7 +385,7 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
host.value(),
req,
[](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) {
return client->semifuture_updateVertex(r);
return client->future_updateVertex(r);
});
}

Expand Down Expand Up @@ -435,8 +435,8 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
req,
[useExperimentalFeature = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->semifuture_chainUpdateEdge(r)
: client->semifuture_updateEdge(r);
return useExperimentalFeature ? client->future_chainUpdateEdge(r)
: client->future_updateEdge(r);
});
}

Expand Down Expand Up @@ -466,7 +466,7 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID s

return getResponse(
evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->semifuture_getUUID(r);
return client->future_getUUID(r);
});
}

Expand Down Expand Up @@ -514,7 +514,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> StorageClient::lookupIndex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupIndexRequest& r) {
return client->semifuture_lookupIndex(r);
return client->future_lookupIndex(r);
});
}

Expand Down Expand Up @@ -543,7 +543,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) {
return client->semifuture_lookupAndTraverse(r);
return client->future_lookupAndTraverse(r);
});
}

Expand Down Expand Up @@ -574,7 +574,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) {
return client->semifuture_scanEdge(r);
return client->future_scanEdge(r);
});
}

Expand Down Expand Up @@ -606,7 +606,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::ScanVertexRequest& r) {
return client->semifuture_scanVertex(r);
return client->future_scanVertex(r);
});
}

Expand All @@ -632,7 +632,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::KVGetResponse>> StorageClient::get(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) {
return client->semifuture_get(r);
return client->future_get(r);
});
}

Expand All @@ -657,7 +657,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::put(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) {
return client->semifuture_put(r);
return client->future_put(r);
});
}

Expand All @@ -682,7 +682,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::remove(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) {
return client->semifuture_remove(r);
return client->future_remove(r);
});
}

Expand Down
14 changes: 9 additions & 5 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <folly/ExceptionWrapper.h>
#include <folly/Optional.h>
#include <folly/Try.h>
#include <folly/futures/Future.h>
#include <glog/logging.h>

#include <unordered_map>
Expand Down Expand Up @@ -146,19 +147,22 @@ template <typename ClientType, typename ClientManagerType>
template <class Request, class RemoteFunc, class Response>
folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerType>::getResponse(
folly::EventBase* evb, const HostAddr& host, const Request& request, RemoteFunc&& remoteFunc) {
static_assert(folly::isSemiFuture<
static_assert(folly::isFuture<
typename std::result_of<RemoteFunc(ClientType*, const Request&)>::type>::value);

stats::StatsManager::addValue(kNumRpcSentToStoraged);
if (evb == nullptr) {
evb = DCHECK_NOTNULL(ioThreadPool_)->getEventBase();
}

auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.get_space_id();
auto partsId = getReqPartsId(request);
return remoteFunc(client.get(), request)
.via(evb)
return folly::via(evb)
.thenValue([func = std::move(remoteFunc), req = std::move(request), evb, host, this](auto&&) {
// NOTE: Create new channel on each thread to avoid TIMEOUT RPC error
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
return func(client.get(), req);
})
.thenValue([spaceId, this](Response&& resp) mutable -> StatusOr<Response> {
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
Expand Down Expand Up @@ -189,7 +193,7 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
}
return std::move(resp);
})
.thenError([this, host, spaceId, partsId = std::move(partsId)](
.thenError([partsId = std::move(partsId), host, spaceId, this](
folly::exception_wrapper&& exWrapper) mutable -> StatusOr<Response> {
stats::StatsManager::addValue(kNumRpcSentToStoragedFailed);

Expand Down
82 changes: 41 additions & 41 deletions src/graph/executor/test/StorageServerStub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
UNUSED(request); \
auto promise = std::make_shared<folly::Promise<respType>>(); \
respType dummyResp; \
auto f = promise->getSemiFuture(); \
auto f = promise->getFuture(); \
promise->setValue(std::move(dummyResp)); \
return f;

Expand Down Expand Up @@ -50,104 +50,104 @@ void GraphStorageLocalServer::stop() {
serving_ = false;
}

folly::SemiFuture<cpp2::GetNeighborsResponse> GraphStorageLocalServer::semifuture_getNeighbors(
folly::Future<cpp2::GetNeighborsResponse> GraphStorageLocalServer::future_getNeighbors(
const cpp2::GetNeighborsRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, semifuture_getNeighbors);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_getNeighbors);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_addVertices(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_addVertices(
const cpp2::AddVerticesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_addVertices);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addVertices);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_chainAddEdges(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_chainAddEdges(
const cpp2::AddEdgesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_chainAddEdges);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_chainAddEdges);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_addEdges(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_addEdges(
const cpp2::AddEdgesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_addEdges);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addEdges);
}

folly::SemiFuture<cpp2::GetPropResponse> GraphStorageLocalServer::semifuture_getProps(
folly::Future<cpp2::GetPropResponse> GraphStorageLocalServer::future_getProps(
const cpp2::GetPropRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetPropResponse, semifuture_getProps);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetPropResponse, future_getProps);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_deleteEdges(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_deleteEdges(
const cpp2::DeleteEdgesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteEdges);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteEdges);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_chainDeleteEdges(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_chainDeleteEdges);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_chainDeleteEdges);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_deleteVertices(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_deleteVertices(
const cpp2::DeleteVerticesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteVertices);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteVertices);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_deleteTags(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_deleteTags(
const cpp2::DeleteTagsRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteTags);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteTags);
}

folly::SemiFuture<cpp2::UpdateResponse> GraphStorageLocalServer::semifuture_updateVertex(
folly::Future<cpp2::UpdateResponse> GraphStorageLocalServer::future_updateVertex(
const cpp2::UpdateVertexRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, semifuture_updateVertex);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateVertex);
}

folly::SemiFuture<cpp2::UpdateResponse> GraphStorageLocalServer::semifuture_chainUpdateEdge(
folly::Future<cpp2::UpdateResponse> GraphStorageLocalServer::future_chainUpdateEdge(
const cpp2::UpdateEdgeRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, semifuture_chainUpdateEdge);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_chainUpdateEdge);
}

folly::SemiFuture<cpp2::UpdateResponse> GraphStorageLocalServer::semifuture_updateEdge(
folly::Future<cpp2::UpdateResponse> GraphStorageLocalServer::future_updateEdge(
const cpp2::UpdateEdgeRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, semifuture_updateEdge);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateEdge);
}

folly::SemiFuture<cpp2::GetUUIDResp> GraphStorageLocalServer::semifuture_getUUID(
folly::Future<cpp2::GetUUIDResp> GraphStorageLocalServer::future_getUUID(
const cpp2::GetUUIDReq& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetUUIDResp, semifuture_getUUID);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetUUIDResp, future_getUUID);
}

folly::SemiFuture<cpp2::LookupIndexResp> GraphStorageLocalServer::semifuture_lookupIndex(
folly::Future<cpp2::LookupIndexResp> GraphStorageLocalServer::future_lookupIndex(
const cpp2::LookupIndexRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::LookupIndexResp, semifuture_lookupIndex);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::LookupIndexResp, future_lookupIndex);
}

folly::SemiFuture<cpp2::GetNeighborsResponse> GraphStorageLocalServer::semifuture_lookupAndTraverse(
folly::Future<cpp2::GetNeighborsResponse> GraphStorageLocalServer::future_lookupAndTraverse(
const cpp2::LookupAndTraverseRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, semifuture_lookupAndTraverse);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_lookupAndTraverse);
}

folly::SemiFuture<cpp2::ScanResponse> GraphStorageLocalServer::semifuture_scanVertex(
folly::Future<cpp2::ScanResponse> GraphStorageLocalServer::future_scanVertex(
const cpp2::ScanVertexRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, semifuture_scanVertex);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanVertex);
}

folly::SemiFuture<cpp2::ScanResponse> GraphStorageLocalServer::semifuture_scanEdge(
folly::Future<cpp2::ScanResponse> GraphStorageLocalServer::future_scanEdge(
const cpp2::ScanEdgeRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, semifuture_scanEdge);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanEdge);
}

folly::SemiFuture<cpp2::KVGetResponse> GraphStorageLocalServer::semifuture_get(
folly::Future<cpp2::KVGetResponse> GraphStorageLocalServer::future_get(
const cpp2::KVGetRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::KVGetResponse, semifuture_get);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::KVGetResponse, future_get);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_put(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_put(
const cpp2::KVPutRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_put);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_put);
}

folly::SemiFuture<cpp2::ExecResponse> GraphStorageLocalServer::semifuture_remove(
folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_remove(
const cpp2::KVRemoveRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, semifuture_remove);
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_remove);
}

} // namespace nebula::storage
Loading

0 comments on commit 13eaa86

Please sign in to comment.