Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Jul 9, 2023
1 parent cc24af4 commit 1baa0f2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
24 changes: 19 additions & 5 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
return future.via(runner())
.ensure([this, buildPathTime]() { addState("build_path_time", buildPathTime); })
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
Expand Down Expand Up @@ -259,7 +261,6 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr,
bool reverse) {
memory::MemoryCheckGuard guard;
auto maxStep = reverse ? rightSteps_ : leftSteps_;
if (step > maxStep) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
Expand Down Expand Up @@ -334,6 +335,7 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
}
return folly::collect(futures).via(runner()).thenValue(
[currentStepResult = newPathsPtr](std::vector<std::vector<NPath*>>&& paths) {
memory::MemoryCheckGuard guard;
std::vector<NPath*> result = std::move(*currentStepResult);
for (auto& path : paths) {
if (path.empty()) {
Expand Down Expand Up @@ -458,7 +460,6 @@ std::vector<Row> AllPathsExecutor::buildOneWayPathFromHashTable(bool reverse) {

folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPaths,
std::vector<NPath*>& rightPaths) {
memory::MemoryCheckGuard guard;
if (leftPaths.empty() || rightPaths.empty()) {
return folly::makeFuture<Status>(Status::OK());
}
Expand Down Expand Up @@ -486,8 +487,10 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
runner(), [this, start, end, reverse]() { return probe(start, end, reverse); }));
}
}
return folly::collect(futures).via(runner()).thenValue(
[this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
return folly::collect(futures)
.via(runner())
.thenValue([this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
memory::MemoryCheckGuard guard;
result_.rows = std::move(path);
for (auto& rows : resps) {
if (rows.empty()) {
Expand All @@ -510,6 +513,14 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
result_.rows.resize(limit_);
}
return Status::OK();
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -546,6 +557,9 @@ std::vector<Row> AllPathsExecutor::probe(size_t start, size_t end, bool reverse)
std::vector<Row> result;
Row emptyPropVerticesRow;
for (size_t i = start; i < end; ++i) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
break;
}
auto& probePath = probePaths_[i];
auto& edgeVal = probePath->edge;
const auto& intersectVid = edgeVal.getEdge().dst;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class AllPathsExecutor final : public StorageAccessExecutor {
bool noLoop_{false};
size_t limit_{std::numeric_limits<size_t>::max()};
std::atomic<size_t> cnt_{0};
std::atomic<bool> memoryExceeded_{false};
size_t maxStep_{0};

size_t leftSteps_{0};
Expand Down

0 comments on commit 1baa0f2

Please sign in to comment.