Skip to content

Commit

Permalink
Update getResponse signature
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Mar 4, 2022
1 parent 22ae6d6 commit 0d0d4fa
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 25 deletions.
9 changes: 6 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq
}
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->semifuture_chainUpdateEdge(r);
});
Expand Down Expand Up @@ -102,7 +103,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->semifuture_chainAddEdges(r);
});
Expand Down Expand Up @@ -158,7 +160,8 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->semifuture_chainDeleteEdges(r);
});
Expand Down
19 changes: 5 additions & 14 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateVertexRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(param.space);
if (!status.ok()) {
Expand All @@ -370,7 +368,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateVertexRequest req;
req.space_id_ref() = param.space;
req.vertex_id_ref() = vertexId;
Expand All @@ -383,10 +380,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) {
return client->semifuture_updateVertex(r);
});
Expand All @@ -405,8 +402,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateEdgeRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -423,7 +418,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateEdgeRequest req;
req.space_id_ref() = space;
req.edge_key_ref() = edgeKey;
Expand All @@ -435,10 +429,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[useExperimentalFeature = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->semifuture_chainUpdateEdge(r)
Expand All @@ -449,7 +443,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetUUIDReq> request;
DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -466,15 +459,13 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID s
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::GetUUIDResp>>(host.status());
}
request.first = std::move(host).value();
cpp2::GetUUIDReq req;
req.space_id_ref() = space;
req.part_id_ref() = part;
req.name_ref() = name;
request.second = std::move(req);

return getResponse(
evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->semifuture_getUUID(r);
});
}
Expand Down
13 changes: 6 additions & 7 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
std::vector<folly::Future<RespWrapper>> respFutures;
respFutures.reserve(requests.size());

for (const std::pair<HostAddr, Request>& req : requests) {
for (const auto& req : requests) {
auto start = time::WallClock::fastNowInMicroSec();

// Future process code will be executed on the IO thread
// Since all requests are sent using the same eventbase, all
// then-callback will be executed on the same IO thread
auto fut =
getResponse(evb, req, std::move(remoteFunc))
getResponse(evb, req.first, req.second, std::move(remoteFunc))
.thenValue([this, start, req](StatusOr<Response>&& status) {
std::unordered_map<PartitionID, nebula::cpp2::ErrorCode> failedParts;
if (status.ok()) {
Expand Down Expand Up @@ -149,15 +149,14 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
template <typename ClientType, typename ClientManagerType>
template <class Request, class RemoteFunc, class Response>
folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerType>::getResponse(
folly::EventBase* evb, const std::pair<HostAddr, Request>& request, RemoteFunc&& remoteFunc) {
folly::EventBase* evb, const HostAddr& host, const Request& request, RemoteFunc&& remoteFunc) {
static_assert(folly::isSemiFuture<
typename std::result_of<RemoteFunc(ClientType*, const Request&)>::type>::value);

auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
return remoteFunc(client.get(), request.second)
auto spaceId = request.get_space_id();
auto partsId = getReqPartsId(request);
return remoteFunc(client.get(), request)
.via(evb)
.thenValue([spaceId, this](Response&& resp) mutable -> StatusOr<Response> {
auto& result = resp.get_result();
Expand Down
4 changes: 3 additions & 1 deletion src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"
#include "common/base/StatusOr.h"
#include "common/datatypes/HostAddr.h"
#include "common/meta/Common.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/storage_types.h"
Expand Down Expand Up @@ -144,7 +145,8 @@ class StorageClientBase {
class Response = typename std::result_of<RemoteFunc(ClientType* client,
const Request&)>::type::value_type>
folly::Future<StatusOr<Response>> getResponse(folly::EventBase* evb,
const std::pair<HostAddr, Request>& request,
const HostAddr& host,
const Request& request,
RemoteFunc&& remoteFunc);

// Cluster given ids into the host they belong to
Expand Down

0 comments on commit 0d0d4fa

Please sign in to comment.