Skip to content

Commit

Permalink
Various smaller improvements for SPARQL Update (#1784)
Browse files Browse the repository at this point in the history
- Store the `DeltaTriplesCount` as signed integer. The difference would otherwise underflow.
- Add the number of insertions and deletions by an UPDATE operation to the returned metadata. Previously only information on the change in the delta triples was given. This only allowed limited conclusion on the effect of the update as insertions and deletions can cancel out in the number of delta triples over a operation.
- Return a JSON from the `clear-delta-triples` command with the number of delta triples after the clear. This is more consistent with the other operations.
- The triples to insert/delete are sorted and removed of duplicates earlier in the triple calculation (`ExecuteUpdate` vs. `DeltaTriples`). Triples that are inserted in an operation will not be deleted before that in the same operation.
  • Loading branch information
Qup42 authored Feb 12, 2025
1 parent 7297305 commit 2697035
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 38 deletions.
22 changes: 22 additions & 0 deletions src/engine/ExecuteUpdate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,31 @@ ExecuteUpdate::computeGraphUpdateQuads(
cancellationHandle->throwIfCancelled();
}
}
sortAndRemoveDuplicates(toInsert);
sortAndRemoveDuplicates(toDelete);
metadata.inUpdate_ = DeltaTriplesCount{static_cast<int64_t>(toInsert.size()),
static_cast<int64_t>(toDelete.size())};
toDelete = setMinus(toDelete, toInsert);
metadata.triplePreparationTime_ = timer.msecs();

return {
IdTriplesAndLocalVocab{std::move(toInsert), std::move(localVocabInsert)},
IdTriplesAndLocalVocab{std::move(toDelete), std::move(localVocabDelete)}};
}

// _____________________________________________________________________________
void ExecuteUpdate::sortAndRemoveDuplicates(
std::vector<IdTriple<>>& container) {
ql::ranges::sort(container);
container.erase(std::unique(container.begin(), container.end()),
container.end());
}

// _____________________________________________________________________________
std::vector<IdTriple<>> ExecuteUpdate::setMinus(
const std::vector<IdTriple<>>& a, const std::vector<IdTriple<>>& b) {
std::vector<IdTriple<>> reducedToDelete;
reducedToDelete.reserve(a.size());
ql::ranges::set_difference(a, b, std::back_inserter(reducedToDelete));
return reducedToDelete;
}
13 changes: 13 additions & 0 deletions src/engine/ExecuteUpdate.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct UpdateMetadata {
Milliseconds triplePreparationTime_ = Zero;
Milliseconds insertionTime_ = Zero;
Milliseconds deletionTime_ = Zero;
std::optional<DeltaTriplesCount> inUpdate_;
};

class ExecuteUpdate {
Expand Down Expand Up @@ -71,4 +72,16 @@ class ExecuteUpdate {
const CancellationHandle& cancellationHandle,
UpdateMetadata& metadata);
FRIEND_TEST(ExecuteUpdate, computeGraphUpdateQuads);

// After the operation the vector is sorted and contains no duplicate
// elements.
static void sortAndRemoveDuplicates(std::vector<IdTriple<>>& container);
FRIEND_TEST(ExecuteUpdate, sortAndRemoveDuplicates);

// For two sorted vectors `A` and `B` return a new vector
// that contains the element of `A\B`.
// Precondition: the inputs must be sorted.
static std::vector<IdTriple<>> setMinus(const std::vector<IdTriple<>>& a,
const std::vector<IdTriple<>>& b);
FRIEND_TEST(ExecuteUpdate, setMinus);
};
15 changes: 11 additions & 4 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,15 @@ Awaitable<void> Server::process(
[this] {
// Use `this` explicitly to silence false-positive errors on the
// captured `this` being unused.
this->index_.deltaTriplesManager().clear();
return this->index_.deltaTriplesManager().modify<DeltaTriplesCount>(
[](auto& deltaTriples) {
deltaTriples.clear();
return deltaTriples.getCounts();
});
},
handle);
co_await std::move(coroutine);
response = createOkResponse("Delta triples have been cleared", request,
MediaType::textPlain);
auto countAfterClear = co_await std::move(coroutine);
response = createJsonResponse(nlohmann::json{countAfterClear}, request);
} else if (auto cmd = checkParameter("cmd", "get-settings")) {
logCommand(cmd, "get server settings");
response = createJsonResponse(RuntimeParameters().toMap(), request);
Expand Down Expand Up @@ -821,6 +824,10 @@ json Server::createResponseMetadataForUpdate(
response["delta-triples"]["after"] = nlohmann::json(countAfter);
response["delta-triples"]["difference"] =
nlohmann::json(countAfter - countBefore);
if (updateMetadata.inUpdate_.has_value()) {
response["delta-triples"]["operation"] =
json(updateMetadata.inUpdate_.value());
}
response["time"]["planning"] =
formatTime(runtimeInfoWholeOp.timeQueryPlanning);
response["time"]["where"] =
Expand Down
8 changes: 5 additions & 3 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ void DeltaTriples::modifyTriplesImpl(CancellationHandle cancellationHandle,
TriplesToHandlesMap& targetMap,
TriplesToHandlesMap& inverseMap) {
rewriteLocalVocabEntriesAndBlankNodes(triples);
ql::ranges::sort(triples);
auto first = std::unique(triples.begin(), triples.end());
triples.erase(first, triples.end());
AD_EXPENSIVE_CHECK(ql::ranges::is_sorted(triples));
AD_EXPENSIVE_CHECK(std::unique(triples.begin(), triples.end()) ==
triples.end());
std::erase_if(triples, [&targetMap](const IdTriple<0>& triple) {
return targetMap.contains(triple);
});
Expand Down Expand Up @@ -248,6 +248,8 @@ template void DeltaTriplesManager::modify<void>(
std::function<void(DeltaTriples&)> const&);
template nlohmann::json DeltaTriplesManager::modify<nlohmann::json>(
const std::function<nlohmann::json(DeltaTriples&)>&);
template DeltaTriplesCount DeltaTriplesManager::modify<DeltaTriplesCount>(
const std::function<DeltaTriplesCount(DeltaTriples&)>&);

// _____________________________________________________________________________
void DeltaTriplesManager::clear() { modify<void>(&DeltaTriples::clear); }
Expand Down
12 changes: 8 additions & 4 deletions src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class SharedLocatedTriplesSnapshot

// A class for keeping track of the number of triples of the `DeltaTriples`.
struct DeltaTriplesCount {
size_t triplesInserted_;
size_t triplesDeleted_;
int64_t triplesInserted_;
int64_t triplesDeleted_;

/// Output as json. The signature of this function is mandated by the json
/// library to allow for implicit conversion.
Expand Down Expand Up @@ -146,8 +146,12 @@ class DeltaTriples {
void clear();

// The number of delta triples added and subtracted.
size_t numInserted() const { return triplesInserted_.size(); }
size_t numDeleted() const { return triplesDeleted_.size(); }
int64_t numInserted() const {
return static_cast<int64_t>(triplesInserted_.size());
}
int64_t numDeleted() const {
return static_cast<int64_t>(triplesDeleted_.size());
}
DeltaTriplesCount getCounts() const;

// Insert triples.
Expand Down
5 changes: 2 additions & 3 deletions test/DeltaTriplesCountTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ TEST(DeltaTriplesCountTest, toJson) {
TEST(DeltaTriplesCountTest, subtractOperator) {
constexpr DeltaTriplesCount count1{10, 5};
constexpr DeltaTriplesCount count2{3, 2};
constexpr DeltaTriplesCount expected{7, 3};
const DeltaTriplesCount actual = count1 - count2;
EXPECT_THAT(actual, testing::Eq(expected));
EXPECT_THAT(count1 - count2, testing::Eq(DeltaTriplesCount{7, 3}));
EXPECT_THAT(count2 - count1, testing::Eq(DeltaTriplesCount{-7, -3}));
}
30 changes: 19 additions & 11 deletions test/DeltaTriplesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TEST_F(DeltaTriplesTest, insertTriplesAndDeleteTriples) {

EXPECT_THAT(deltaTriples, StateIs(0, 0, 0, {}, {}));

// Inserting triples.
// Inserting triples. The triples being inserted must be sorted.
deltaTriples.insertTriples(
cancellationHandle,
makeIdTriples(vocab, localVocab, {"<A> <B> <C>", "<A> <B> <D>"}));
Expand All @@ -164,14 +164,14 @@ TEST_F(DeltaTriplesTest, insertTriplesAndDeleteTriples) {
deltaTriples,
StateIs(3, 0, 3, {"<A> <B> <C>", "<A> <B> <D>", "<A> <low> <a>"}, {}));

// Inserting unsorted triples works.
// Insert more triples.
deltaTriples.insertTriples(
cancellationHandle,
makeIdTriples(vocab, localVocab, {"<B> <D> <C>", "<B> <C> <D>"}));
makeIdTriples(vocab, localVocab, {"<B> <C> <D>", "<B> <D> <C>"}));
EXPECT_THAT(deltaTriples,
StateIs(5, 0, 5,
{"<A> <B> <C>", "<A> <B> <D>", "<B> <D> <C>",
"<B> <C> <D>", "<A> <low> <a>"},
{"<A> <B> <C>", "<A> <B> <D>", "<B> <C> <D>",
"<B> <D> <C>", "<A> <low> <a>"},
{}));

// Inserting already inserted triples has no effect.
Expand Down Expand Up @@ -212,10 +212,20 @@ TEST_F(DeltaTriplesTest, insertTriplesAndDeleteTriples) {
{"<A> <B> <C>", "<B> <C> <D>", "<A> <low> <a>", "<B> <D> <C>"},
{"<A> <B> <D>", "<A> <B> <F>", "<A> <next> <B>", "<B> <next> <C>"}));

// Deleting unsorted triples.
// Unsorted triples are not allowed.
if constexpr (ad_utility::areExpensiveChecksEnabled) {
AD_EXPECT_THROW_WITH_MESSAGE(
deltaTriples.deleteTriples(
cancellationHandle,
makeIdTriples(vocab, localVocab,
{"<C> <prev> <B>", "<B> <prev> <A>"})),
testing::_);
}

// Deleting triples.
deltaTriples.deleteTriples(
cancellationHandle,
makeIdTriples(vocab, localVocab, {"<C> <prev> <B>", "<B> <prev> <A>"}));
makeIdTriples(vocab, localVocab, {"<B> <prev> <A>", "<C> <prev> <B>"}));
EXPECT_THAT(
deltaTriples,
StateIs(4, 6, 10,
Expand Down Expand Up @@ -347,7 +357,7 @@ TEST_F(DeltaTriplesTest, DeltaTriplesManager) {
absl::StrCat("<A> <B> <E", threadIdx, ">")});
auto triplesToDelete = makeIdTriples(
vocab, localVocab,
{"<A> <C> <E>", absl::StrCat("<A> <B> <E", threadIdx, ">"),
{"<A> <A> <E>", absl::StrCat("<A> <B> <E", threadIdx, ">"),
absl::StrCat("<A> <B> <F", threadIdx, ">")});
// Insert the `triplesToInsert`.
deltaTriplesManager.modify<void>([&](DeltaTriples& deltaTriples) {
Expand Down Expand Up @@ -416,10 +426,8 @@ TEST_F(DeltaTriplesTest, DeltaTriplesManager) {
// thread-exclusive triple and inserts one thread-exclusive triple that is
// deleted right after (This triple is stored as deleted in the `DeltaTriples`
// because it might be contained in the original input). Additionally, there
// is one common triple inserted by// all the threads and one common triple
// is one common triple inserted by all the threads and one common triple
// that is deleted by all the threads.
//

auto deltaImpl = deltaTriplesManager.deltaTriples_.rlock();
EXPECT_THAT(*deltaImpl, NumTriples(numThreads + 1, 2 * numThreads + 1,
3 * numThreads + 2));
Expand Down
2 changes: 1 addition & 1 deletion test/DeltaTriplesTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ inline auto NumTriplesInAllPermutations =
// `getCounts()` of a `DeltaTriples` and `numTriples()` for all
// `LocatedTriplesPerBlock` of the `DeltaTriples`.
inline auto NumTriples =
[](size_t inserted, size_t deleted,
[](int64_t inserted, int64_t deleted,
size_t inAllPermutations) -> testing::Matcher<const DeltaTriples&> {
return testing::AllOf(
AD_PROPERTY(DeltaTriples, numInserted, testing::Eq(inserted)),
Expand Down
66 changes: 55 additions & 11 deletions test/ExecuteUpdateTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ TEST(ExecuteUpdate, executeUpdate) {
auto expectExecuteUpdate =
[&index, &expectExecuteUpdateHelper](
const std::string& update,
const testing::Matcher<const DeltaTriples&>& deltaTriplesMatcher) {
const testing::Matcher<const DeltaTriples&>& deltaTriplesMatcher,
source_location sourceLocation = source_location::current()) {
auto l = generateLocationTrace(sourceLocation);
DeltaTriples deltaTriples{index};
expectExecuteUpdateHelper(update, deltaTriples);
EXPECT_THAT(deltaTriples, deltaTriplesMatcher);
Expand Down Expand Up @@ -127,7 +129,9 @@ TEST(ExecuteUpdate, computeGraphUpdateQuads) {
[&executeComputeGraphUpdateQuads](
const std::string& update,
const Matcher<const std::vector<::IdTriple<>>&>& toInsertMatcher,
const Matcher<const std::vector<::IdTriple<>>&>& toDeleteMatcher) {
const Matcher<const std::vector<::IdTriple<>>&>& toDeleteMatcher,
source_location sourceLocation = source_location::current()) {
auto l = generateLocationTrace(sourceLocation);
EXPECT_THAT(executeComputeGraphUpdateQuads(update),
Pair(AD_FIELD(ExecuteUpdate::IdTriplesAndLocalVocab,
idTriples_, toInsertMatcher),
Expand All @@ -137,7 +141,9 @@ TEST(ExecuteUpdate, computeGraphUpdateQuads) {
auto expectComputeGraphUpdateQuadsFails =
[&executeComputeGraphUpdateQuads](
const std::string& update,
const Matcher<const std::string&>& messageMatcher) {
const Matcher<const std::string&>& messageMatcher,
source_location sourceLocation = source_location::current()) {
auto l = generateLocationTrace(sourceLocation);
AD_EXPECT_THROW_WITH_MESSAGE(executeComputeGraphUpdateQuads(update),
messageMatcher);
};
Expand All @@ -151,22 +157,18 @@ TEST(ExecuteUpdate, computeGraphUpdateQuads) {
ElementsAreArray({IdTriple(Id("<z>"), Id("<label>"), Id("\"zz\"@en"))}));
expectComputeGraphUpdateQuads(
"DELETE { ?s <is-a> ?o } INSERT { <s> <p> <o> } WHERE { ?s <is-a> ?o }",
ElementsAreArray({IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>")),
IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>"))}),
ElementsAreArray({IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>"))}),
ElementsAreArray({IdTriple(Id("<x>"), Id("<is-a>"), Id("<y>")),
IdTriple(Id("<y>"), Id("<is-a>"), Id("<x>"))}));
expectComputeGraphUpdateQuads(
"DELETE { <s> <p> <o> } INSERT { <s> <p> <o> } WHERE { ?s <is-a> ?o }",
ElementsAreArray({IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>")),
IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>"))}),
ElementsAreArray({IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>")),
IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>"))}));
ElementsAreArray({IdTriple(LVI("<s>"), LVI("<p>"), LVI("<o>"))}),
IsEmpty());
expectComputeGraphUpdateQuads(
"DELETE { ?s <is-a> ?o } INSERT { ?s <is-a> ?o } WHERE { ?s <is-a> ?o }",
ElementsAreArray({IdTriple(Id("<x>"), Id("<is-a>"), Id("<y>")),
IdTriple(Id("<y>"), Id("<is-a>"), Id("<x>"))}),
ElementsAreArray({IdTriple(Id("<x>"), Id("<is-a>"), Id("<y>")),
IdTriple(Id("<y>"), Id("<is-a>"), Id("<x>"))}));
IsEmpty());
expectComputeGraphUpdateQuads(
"DELETE WHERE { ?s ?p ?o }", IsEmpty(),
UnorderedElementsAreArray(
Expand Down Expand Up @@ -370,3 +372,45 @@ TEST(ExecuteUpdate, computeAndAddQuadsForResultRow) {
ElementsAreArray({IdTriple{{V(0), V(1), V(1), V(3)}},
IdTriple{{V(0), V(1), V(2), V(3)}}}));
}

TEST(ExecuteUpdate, sortAndRemoveDuplicates) {
auto expect = [](std::vector<IdTriple<>> input,
const std::vector<IdTriple<>>& expected,
source_location l = source_location::current()) {
auto trace = generateLocationTrace(l);
ExecuteUpdate::sortAndRemoveDuplicates(input);
EXPECT_THAT(input, testing::ElementsAreArray(expected));
};
auto IdTriple = [&](uint64_t s, uint64_t p, uint64_t o, uint64_t g = 0) {
return ::IdTriple({V(s), V(p), V(o), V(g)});
};
expect({}, {});
expect({IdTriple(1, 1, 1)}, {IdTriple(1, 1, 1)});
expect({IdTriple(1, 1, 1), IdTriple(2, 2, 2)},
{IdTriple(1, 1, 1), IdTriple(2, 2, 2)});
expect({IdTriple(2, 2, 2), IdTriple(1, 1, 1)},
{IdTriple(1, 1, 1), IdTriple(2, 2, 2)});
expect({IdTriple(1, 1, 1), IdTriple(1, 1, 1)}, {IdTriple(1, 1, 1)});
expect({IdTriple(2, 2, 2), IdTriple(3, 3, 3), IdTriple(3, 3, 3),
IdTriple(2, 2, 2), IdTriple(1, 1, 1)},
{IdTriple(1, 1, 1), IdTriple(2, 2, 2), IdTriple(3, 3, 3)});
}
TEST(ExecuteUpdate, setMinus) {
auto expect = [](std::vector<IdTriple<>> a, std::vector<IdTriple<>> b,
const std::vector<IdTriple<>>& expected,
source_location l = source_location::current()) {
auto trace = generateLocationTrace(l);
EXPECT_THAT(ExecuteUpdate::setMinus(a, b),
testing::ElementsAreArray(expected));
};
auto IdTriple = [&](uint64_t s, uint64_t p, uint64_t o, uint64_t g = 0) {
return ::IdTriple({V(s), V(p), V(o), V(g)});
};
expect({}, {}, {});
expect({IdTriple(1, 2, 3), IdTriple(4, 5, 6)}, {},
{IdTriple(1, 2, 3), IdTriple(4, 5, 6)});
expect({IdTriple(1, 2, 3), IdTriple(4, 5, 6), IdTriple(7, 8, 9)},
{IdTriple(4, 5, 6), IdTriple(7, 8, 9)}, {IdTriple(1, 2, 3)});
expect({IdTriple(1, 2, 3)},
{IdTriple(1, 2, 3), IdTriple(4, 5, 6), IdTriple(7, 8, 9)}, {});
}
3 changes: 2 additions & 1 deletion test/ServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ TEST(ServerTest, createResponseMetadata) {
json deltaTriplesJson{
{"before", {{"inserted", 0}, {"deleted", 0}, {"total", 0}}},
{"after", {{"inserted", 1}, {"deleted", 0}, {"total", 1}}},
{"difference", {{"inserted", 1}, {"deleted", 0}, {"total", 1}}}};
{"difference", {{"inserted", 1}, {"deleted", 0}, {"total", 1}}},
{"operation", {{"inserted", 1}, {"deleted", 0}, {"total", 1}}}};
json locatedTriplesJson{
{"SPO", {{"blocks-affected", 1}, {"blocks-total", 1}}},
{"POS", {{"blocks-affected", 1}, {"blocks-total", 1}}},
Expand Down

0 comments on commit 2697035

Please sign in to comment.