Skip to content

Commit

Permalink
Refactor storage client
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Mar 16, 2022
1 parent a57acd0 commit 5814d59
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 357 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
;;
esac
working-directory: tests/
timeout-minutes: 2
timeout-minutes: 4
- name: Pytest
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test
Expand Down Expand Up @@ -266,7 +266,7 @@ jobs:
run: |
make standalone-up
working-directory: tests/
timeout-minutes: 60
timeout-minutes: 4
- name: TCK
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} standalone-tck
Expand Down
15 changes: 9 additions & 6 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq
}
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->future_chainUpdateEdge(r);
return client->semifuture_chainUpdateEdge(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down Expand Up @@ -102,9 +103,10 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->future_chainAddEdges(r);
return client->semifuture_chainAddEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down Expand Up @@ -158,9 +160,10 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
return client->semifuture_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
Expand Down
59 changes: 25 additions & 34 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) {
return client->future_getNeighbors(r);
return client->semifuture_getNeighbors(r);
});
}

Expand Down Expand Up @@ -142,7 +142,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::AddVerticesRequest& r) {
return client->future_addVertices(r);
return client->semifuture_addVertices(r);
});
}

Expand Down Expand Up @@ -180,8 +180,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonReq
std::move(requests),
[useToss = param.useExperimentalFeature](ThriftClientType* client,
const cpp2::AddEdgesRequest& r) {
return useToss ? client->future_chainAddEdges(r)
: client->future_addEdges(r);
return useToss ? client->semifuture_chainAddEdges(r)
: client->semifuture_addEdges(r);
});
}

Expand Down Expand Up @@ -237,7 +237,7 @@ StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
return client->future_getProps(r);
return client->semifuture_getProps(r);
});
}

Expand Down Expand Up @@ -270,8 +270,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(
std::move(requests),
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
return useToss ? client->semifuture_chainDeleteEdges(r)
: client->semifuture_deleteEdges(r);
});
}

Expand Down Expand Up @@ -303,7 +303,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) {
return client->future_deleteVertices(r);
return client->semifuture_deleteVertices(r);
});
}

Expand Down Expand Up @@ -335,7 +335,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteTags(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) {
return client->future_deleteTags(r);
return client->semifuture_deleteTags(r);
});
}

Expand All @@ -352,8 +352,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateVertexRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(param.space);
if (!status.ok()) {
Expand All @@ -370,7 +368,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateVertexRequest req;
req.space_id_ref() = param.space;
req.vertex_id_ref() = vertexId;
Expand All @@ -383,12 +380,12 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) {
return client->future_updateVertex(r);
return client->semifuture_updateVertex(r);
});
}

Expand All @@ -405,8 +402,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateEdgeRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -423,7 +418,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateEdgeRequest req;
req.space_id_ref() = space;
req.edge_key_ref() = edgeKey;
Expand All @@ -435,21 +429,20 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[useExperimentalFeature = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->future_chainUpdateEdge(r)
: client->future_updateEdge(r);
return useExperimentalFeature ? client->semifuture_chainUpdateEdge(r)
: client->semifuture_updateEdge(r);
});
}

folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetUUIDReq> request;
DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -466,16 +459,14 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID s
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::GetUUIDResp>>(host.status());
}
request.first = std::move(host).value();
cpp2::GetUUIDReq req;
req.space_id_ref() = space;
req.part_id_ref() = part;
req.name_ref() = name;
request.second = std::move(req);

return getResponse(
evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->future_getUUID(r);
evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->semifuture_getUUID(r);
});
}

Expand Down Expand Up @@ -523,7 +514,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> StorageClient::lookupIndex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupIndexRequest& r) {
return client->future_lookupIndex(r);
return client->semifuture_lookupIndex(r);
});
}

Expand Down Expand Up @@ -552,7 +543,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) {
return client->future_lookupAndTraverse(r);
return client->semifuture_lookupAndTraverse(r);
});
}

Expand Down Expand Up @@ -583,7 +574,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) {
return client->future_scanEdge(r);
return client->semifuture_scanEdge(r);
});
}

Expand Down Expand Up @@ -615,7 +606,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::ScanVertexRequest& r) {
return client->future_scanVertex(r);
return client->semifuture_scanVertex(r);
});
}

Expand All @@ -641,7 +632,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::KVGetResponse>> StorageClient::get(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) {
return client->future_get(r);
return client->semifuture_get(r);
});
}

Expand All @@ -666,7 +657,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::put(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) {
return client->future_put(r);
return client->semifuture_put(r);
});
}

Expand All @@ -691,7 +682,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::remove(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) {
return client->future_remove(r);
return client->semifuture_remove(r);
});
}

Expand Down
Loading

0 comments on commit 5814d59

Please sign in to comment.