Skip to content

Commit

Permalink
fix all path memory tracker (#5621)
Browse files Browse the repository at this point in the history
* fix all path memory tracker

* fix error

* Update pull_request.yml

enable sccache debug log

* Update pull_request.yml

add ninja -v

* Update pull_request.yml

* Update pull_request.yml

* Update pull_request.yml

* Update pull_request.yml

---------

Co-authored-by: George <58841610+Shinji-IkariG@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 10, 2023
1 parent e14ef98 commit a157e95
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
return future.via(runner())
.ensure([this, buildPathTime]() { addState("build_path_time", buildPathTime); })
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
Expand Down Expand Up @@ -485,8 +487,9 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
runner(), [this, start, end, reverse]() { return probe(start, end, reverse); }));
}
}
return folly::collect(futures).via(runner()).thenValue(
[this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
return folly::collect(futures)
.via(runner())
.thenValue([this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
memory::MemoryCheckGuard guard;
result_.rows = std::move(path);
for (auto& rows : resps) {
Expand All @@ -510,6 +513,14 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
result_.rows.resize(limit_);
}
return Status::OK();
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand All @@ -528,6 +539,7 @@ void AllPathsExecutor::buildHashTable(std::vector<NPath*>& paths, bool reverse)
}

std::vector<Row> AllPathsExecutor::probe(size_t start, size_t end, bool reverse) {
memory::MemoryCheckGuard guard;
auto buildPath = [](std::vector<Value>& leftPath,
const Value& intersectVertex,
std::vector<Value>& rightPath) {
Expand All @@ -545,6 +557,9 @@ std::vector<Row> AllPathsExecutor::probe(size_t start, size_t end, bool reverse)
std::vector<Row> result;
Row emptyPropVerticesRow;
for (size_t i = start; i < end; ++i) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
break;
}
auto& probePath = probePaths_[i];
auto& edgeVal = probePath->edge;
const auto& intersectVid = edgeVal.getEdge().dst;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class AllPathsExecutor final : public StorageAccessExecutor {
bool noLoop_{false};
size_t limit_{std::numeric_limits<size_t>::max()};
std::atomic<size_t> cnt_{0};
std::atomic<bool> memoryExceeded_{false};
size_t maxStep_{0};

size_t leftSteps_{0};
Expand Down

0 comments on commit a157e95

Please sign in to comment.