Skip to content

Commit

Permalink
Basic support of SERVICE clause (#793)
Browse files Browse the repository at this point in the history
Support SERVICE queries to a remote endpoint specified via an IRI. The query is sent using a POST request with headers `Content-Type: application/sparql-query` and `Accept: text/tab-separated-values`. The result TSV is parsed and words that are not contained in the index vocabulary are added to the local vocabulary of the result of the SERVICE operation.

Shortcomings of this implementation are: there is no timeout, the TSV is first read (and stored) completely and only then processed further, TSV is simple to parse but may be ambiguous with respect to how IRIs or literals are encoded, the `SILENT` keyword (telling the engine to ignore the SERVICE clause if the query fails) is currently not supported, variable endpoints are currently not supported. Also, the query is currently sent after query planning, so the query planner has no information about the size, cost, or multiplicities of the result.
  • Loading branch information
hannahbast authored Feb 22, 2023
1 parent 0e4a076 commit 857099c
Show file tree
Hide file tree
Showing 20 changed files with 675 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_library(engine
Union.cpp Union.h
MultiColumnJoin.cpp MultiColumnJoin.h
TransitivePath.cpp TransitivePath.h
Service.cpp Service.h
Values.cpp Values.h
Bind.cpp Bind.h
idTable/IdTable.h
Expand All @@ -39,5 +40,4 @@ add_library(engine
../util/Parameters.h RuntimeInformation.cpp CheckUsePatternTrick.cpp CheckUsePatternTrick.h
VariableToColumnMap.cpp ExportQueryExecutionTrees.cpp )


target_link_libraries(engine index parser sparqlExpressions http SortPerformanceEstimator absl::flat_hash_set ${ICU_LIBRARIES} boost_iostreams)
2 changes: 2 additions & 0 deletions src/engine/CheckUsePatternTrick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ bool isVariableContainedInGraphPatternOperation(
});
} else if constexpr (std::is_same_v<T, p::Values>) {
return ad_utility::contains(arg._inlineValues._variables, variable);
} else if constexpr (std::is_same_v<T, p::Service>) {
return ad_utility::contains(arg.visibleVariables_, variable);
} else {
static_assert(std::is_same_v<T, p::TransPath>);
// The `TransPath` is set up later in the query planning, when this
Expand Down
11 changes: 11 additions & 0 deletions src/engine/LocalVocab.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ LocalVocabIndex LocalVocab::getIndexAndAddIfNotContained(std::string&& word) {
return getIndexAndAddIfNotContainedImpl(std::move(word));
}

// _____________________________________________________________________________
std::optional<LocalVocabIndex> LocalVocab::getIndexOrNullopt(
const std::string& word) const {
auto localVocabIndex = wordsToIndexesMap_.find(word);
if (localVocabIndex != wordsToIndexesMap_.end()) {
return localVocabIndex->second;
} else {
return std::nullopt;
}
}

// _____________________________________________________________________________
const std::string& LocalVocab::getWord(LocalVocabIndex localVocabIndex) const {
if (localVocabIndex.get() >= indexesToWordsMap_.size()) {
Expand Down
5 changes: 5 additions & 0 deletions src/engine/LocalVocab.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class LocalVocab {
LocalVocabIndex getIndexAndAddIfNotContained(const std::string& word);
LocalVocabIndex getIndexAndAddIfNotContained(std::string&& word);

// Get the index of a word in the local vocabulary, or std::nullopt if it is
// not contained. This is useful for testing.
std::optional<LocalVocabIndex> getIndexOrNullopt(
const std::string& word) const;

// The number of words in the vocabulary.
size_t size() const { return indexesToWordsMap_.size(); }

Expand Down
4 changes: 4 additions & 0 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "engine/NeutralElementOperation.h"
#include "engine/OptionalJoin.h"
#include "engine/OrderBy.h"
#include "engine/Service.h"
#include "engine/Sort.h"
#include "engine/TextOperationWithFilter.h"
#include "engine/TransitivePath.h"
Expand Down Expand Up @@ -191,6 +192,8 @@ void QueryExecutionTree::setOperation(std::shared_ptr<Op> operation) {
_type = DISTINCT;
} else if constexpr (std::is_same_v<Op, Values>) {
_type = VALUES;
} else if constexpr (std::is_same_v<Op, Service>) {
_type = SERVICE;
} else if constexpr (std::is_same_v<Op, TransitivePath>) {
_type = TRANSITIVE_PATH;
} else if constexpr (std::is_same_v<Op, OrderBy>) {
Expand Down Expand Up @@ -230,6 +233,7 @@ template void QueryExecutionTree::setOperation(std::shared_ptr<Bind>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Sort>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Distinct>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Values>);
template void QueryExecutionTree::setOperation(std::shared_ptr<Service>);
template void QueryExecutionTree::setOperation(std::shared_ptr<TransitivePath>);
template void QueryExecutionTree::setOperation(std::shared_ptr<OrderBy>);
template void QueryExecutionTree::setOperation(std::shared_ptr<GroupBy>);
Expand Down
1 change: 1 addition & 0 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class QueryExecutionTree {
MULTICOLUMN_JOIN,
TRANSITIVE_PATH,
VALUES,
SERVICE,
BIND,
MINUS,
NEUTRAL_ELEMENT,
Expand Down
5 changes: 4 additions & 1 deletion src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <engine/OptionalJoin.h>
#include <engine/OrderBy.h>
#include <engine/QueryPlanner.h>
#include <engine/Service.h>
#include <engine/Sort.h>
#include <engine/TextOperationWithFilter.h>
#include <engine/TextOperationWithoutFilter.h>
Expand Down Expand Up @@ -445,7 +446,9 @@ std::vector<QueryPlanner::SubtreePlan> QueryPlanner::optimize(
SubtreePlan valuesPlan =
makeSubtreePlan<Values>(_qec, arg._inlineValues);
joinCandidates(std::vector{std::move(valuesPlan)});

} else if constexpr (std::is_same_v<T, p::Service>) {
SubtreePlan servicePlan = makeSubtreePlan<Service>(_qec, arg);
joinCandidates(std::vector{std::move(servicePlan)});
} else if constexpr (std::is_same_v<T, p::Bind>) {
// The logic of the BIND operation is implemented in the joinCandidates
// lambda. Reason: BIND does not add a new join operation like for the
Expand Down
192 changes: 192 additions & 0 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2022 - 2023, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Hannah Bast (bast@cs.uni-freiburg.de)

#include "engine/Service.h"

#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "engine/CallFixedSize.h"
#include "engine/Values.h"
#include "engine/VariableToColumnMap.h"
#include "parser/TokenizerCtre.h"
#include "parser/TurtleParser.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/http/HttpClient.h"
#include "util/http/HttpUtils.h"

// ____________________________________________________________________________
Service::Service(QueryExecutionContext* qec,
parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction)
: Operation{qec},
parsedServiceClause_{std::move(parsedServiceClause)},
getTsvFunction_{std::move(getTsvFunction)} {}

// ____________________________________________________________________________
std::string Service::asStringImpl(size_t indent) const {
std::ostringstream os;
for (size_t i = 0; i < indent; ++i) {
os << " ";
}
// TODO: This duplicates code in GraphPatternOperation.cpp .
os << "SERVICE " << parsedServiceClause_.serviceIri_.toSparql() << " {\n"
<< parsedServiceClause_.prologue_ << "\n"
<< parsedServiceClause_.graphPatternAsString_ << "\n}\n";
return std::move(os).str();
}

// ____________________________________________________________________________
std::string Service::getDescriptor() const {
return absl::StrCat("Service with IRI ",
parsedServiceClause_.serviceIri_.toSparql());
}

// ____________________________________________________________________________
size_t Service::getResultWidth() const {
return parsedServiceClause_.visibleVariables_.size();
}

// ____________________________________________________________________________
VariableToColumnMap Service::computeVariableToColumnMap() const {
VariableToColumnMap map;
const auto& visibleVariables = parsedServiceClause_.visibleVariables_;
for (size_t i = 0; i < visibleVariables.size(); i++) {
map[visibleVariables[i]] = i;
}
return map;
}

// ____________________________________________________________________________
float Service::getMultiplicity([[maybe_unused]] size_t col) {
// TODO: For now, we don't have any information about the multiplicities at
// query planning time, so we just return `1` for each column.
return 1;
}

// ____________________________________________________________________________
size_t Service::getSizeEstimate() {
// TODO: For now, we don't have any information about the result size at
// query planning time, so we just return `100'000`.
return 100'000;
}

// ____________________________________________________________________________
size_t Service::getCostEstimate() {
// TODO: For now, we don't have any information about the cost at query
// planning time, so we just return ten times the estimated size.
return 10 * getSizeEstimate();
}

// ____________________________________________________________________________
void Service::computeResult(ResultTable* result) {
// Get the URL of the SPARQL endpoint.
std::string_view serviceIriString = parsedServiceClause_.serviceIri_.iri();
AD_CONTRACT_CHECK(serviceIriString.starts_with("<") &&
serviceIriString.ends_with(">"));
serviceIriString.remove_prefix(1);
serviceIriString.remove_suffix(1);
ad_utility::httpUtils::Url serviceUrl{serviceIriString};

// Construct the query to be sent to the SPARQL endpoint.
std::string variablesForSelectClause = absl::StrJoin(
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
std::string serviceQuery = absl::StrCat(
parsedServiceClause_.prologue_, "\nSELECT ", variablesForSelectClause,
" WHERE ", parsedServiceClause_.graphPatternAsString_);
LOG(INFO) << "Sending SERVICE query to remote endpoint "
<< "(protocol: " << serviceUrl.protocolAsString()
<< ", host: " << serviceUrl.host()
<< ", port: " << serviceUrl.port()
<< ", target: " << serviceUrl.target() << ")" << std::endl
<< serviceQuery << std::endl;

// Send the query to the remote SPARQL endpoint via a POST request and get the
// result as TSV.
//
// TODO: We should support a timeout here.
//
// TODO: We ask for the result as TSV because that is a compact and
// easy-to-parse format. It might not be the best choice regarding robustness
// and portability though. In particular, we are not sure how deterministic
// the TSV output is with respect to the precise encoding of literals.
std::istringstream tsvResult =
getTsvFunction_(serviceUrl, boost::beast::http::verb::post, serviceQuery,
"application/sparql-query", "text/tab-separated-values");

// The first line of the TSV result contains the variable names.
std::string tsvHeaderRow;
if (!std::getline(tsvResult, tsvHeaderRow)) {
throw std::runtime_error(absl::StrCat("Response from SPARQL endpoint ",
serviceUrl.host(), " is empty"));
}
LOG(INFO) << "Header row of TSV result: " << tsvHeaderRow << std::endl;

// Check that the variables in the header row agree with those requested by
// the SERVICE query.
std::string expectedHeaderRow = absl::StrJoin(
parsedServiceClause_.visibleVariables_, "\t", Variable::AbslFormatter);
if (tsvHeaderRow != expectedHeaderRow) {
throw std::runtime_error(absl::StrCat(
"Header row of TSV result for SERVICE query is \"", tsvHeaderRow,
"\", but expected \"", expectedHeaderRow, "\""));
}

// Set basic properties of the result table (the `_resultTypes` don't matter,
// as long as they have the right size, see `ResultTypes.h`).
result->_sortedBy = resultSortedOn();
result->_idTable.setNumColumns(getResultWidth());
result->_resultTypes.resize(parsedServiceClause_.visibleVariables_.size(),
ResultTable::ResultType::KB);

// Fill the result table using the `writeTsvResult` method below.
size_t resWidth = getResultWidth();
CALL_FIXED_SIZE(resWidth, &Service::writeTsvResult, this,
std::move(tsvResult), result);
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeTsvResult(std::istringstream tsvResult,
ResultTable* result) {
IdTableStatic<I> idTable = std::move(result->_idTable).toStatic<I>();
size_t rowIdx = 0;
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());
std::string line;
std::string lastLine;
const size_t numVariables = parsedServiceClause_.visibleVariables_.size();
while (lastLine = std::move(line), std::getline(tsvResult, line)) {
// Print first line.
if (rowIdx == 0) {
LOG(INFO) << "First non-header row of TSV result: " << line << std::endl;
}
std::vector<std::string_view> valueStrings = absl::StrSplit(line, "\t");
if (valueStrings.size() != numVariables) {
throw std::runtime_error(absl::StrCat(
"Number of columns in ", rowIdx + 1, " of TSV result is ",
valueStrings.size(), "but number of variables in header row is ",
numVariables));
}
idTable.emplace_back();
for (size_t colIdx = 0; colIdx < valueStrings.size(); colIdx++) {
TripleComponent tc = TurtleStringParser<TokenizerCtre>::parseTripleObject(
valueStrings[colIdx]);
Id id = std::move(tc).toValueId(getIndex().getVocab(),
result->localVocabNonConst());
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
}
}
rowIdx++;
}
if (idTable.size() > 1) {
LOG(INFO) << "Last non-header row of TSV result: " << lastLine << std::endl;
}
AD_CORRECTNESS_CHECK(rowIdx == idTable.size());
LOG(INFO) << "Number of rows in result: " << idTable.size() << std::endl;
LOG(INFO) << "Number of entries in local vocabulary per column: "
<< absl::StrJoin(numLocalVocabPerColumn, ", ") << std::endl;
result->_idTable = std::move(idTable).toDynamic();
}
91 changes: 91 additions & 0 deletions src/engine/Service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 - 2023, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Hannah Bast (bast@cs.uni-freiburg.de)

#pragma once

#include <functional>

#include "engine/Operation.h"
#include "engine/Values.h"
#include "parser/ParsedQuery.h"
#include "util/http/HttpClient.h"

// The SERVICE operation. Sends a query to the remote endpoint specified by the
// service IRI, gets the result as TSV, parses it, and writes it into a result
// table.
//
// TODO: The current implementation works, but is preliminary in several
// respects:
//
// 1. Reading the result as TSV has potential problems (see comment in
// `computeResult` for details).
//
// 2. There should be a timeout.
//
// 3. A variable in place of the IRI is not yet supported (see comment in
// `computeResult` for details).
//
// 4. The SERVICE is currently executed *after* the query planning. The
// estimates of the result size, cost, and multiplicities are therefore dummy
// values.
//
class Service : public Operation {
public:
// The type of the function used to obtain the results, see below.
using GetTsvFunction = std::function<std::istringstream(
ad_utility::httpUtils::Url, const boost::beast::http::verb&,
std::string_view, std::string_view, std::string_view)>;

private:
// The parsed SERVICE clause.
parsedQuery::Service parsedServiceClause_;

// The function used to obtain the result from the remote endpoint.
GetTsvFunction getTsvFunction_;

public:
// Construct from parsed Service clause.
//
// NOTE: The third argument is the function used to obtain the result from the
// remote endpoint. The default is to use `httpUtils::sendHttpOrHttpsRequest`,
// but in our tests (`ServiceTest`) we use a mock function that does not
// require a running `HttpServer`.
Service(QueryExecutionContext* qec, parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction = sendHttpOrHttpsRequest);

// Methods inherited from base class `Operation`.
std::string getDescriptor() const override;
size_t getResultWidth() const override;
std::vector<size_t> resultSortedOn() const override { return {}; }
float getMultiplicity(size_t col) override;
size_t getSizeEstimate() override;
size_t getCostEstimate() override;
VariableToColumnMap computeVariableToColumnMap() const override;

// Not relevant for SERVICE.
void setTextLimit([[maybe_unused]] size_t limit) override {}

// We know nothing about the result at query planning time.
bool knownEmptyResult() override { return false; }

// A SERVICE clause has no children.
vector<QueryExecutionTree*> getChildren() override { return {}; }

private:
// The string returned by this function is used as cache key.
std::string asStringImpl(size_t indent = 0) const override;

// Compute the result using `getTsvFunction_`.
void computeResult(ResultTable* result) override;

// Write the given TSV result to the given result object. The `I` is the width
// of the result table.
//
// NOTE: This is similar to `Values::writeValues`, except that we have to
// parse TSV here and not a VALUES clause. Note that the only reason that
// `tsvResult` is not `const` here is because the method iterates over the
// `std::istringstream` and thus changes it.
template <size_t I>
void writeTsvResult(std::istringstream tsvResult, ResultTable* result);
};
3 changes: 3 additions & 0 deletions src/parser/GraphPatternOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ void GraphPatternOperation::toString(std::ostringstream& os,
} else if constexpr (std::is_same_v<T, Values>) {
os << "VALUES (" << arg._inlineValues.variablesToString() << ") "
<< arg._inlineValues.valuesToString();
} else if constexpr (std::is_same_v<T, Service>) {
os << "SERVICE " << arg.serviceIri_.toSparql() << " { "
<< arg.graphPatternAsString_ << " }";
} else if constexpr (std::is_same_v<T, BasicGraphPattern>) {
for (size_t i = 0; i + 1 < arg._triples.size(); ++i) {
os << "\n";
Expand Down
Loading

0 comments on commit 857099c

Please sign in to comment.