Skip to content

Commit

Permalink
Optimise remote server
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Feb 11, 2025
1 parent bdb117c commit e05d54a
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 155 deletions.
20 changes: 6 additions & 14 deletions velox/functions/remote/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@

add_library(velox_functions_remote_server_base RemoteFunctionBaseService.cpp)
target_link_libraries(
velox_functions_remote_server_base
PUBLIC velox_memory velox_expression)
velox_functions_remote_server_base velox_expression velox_type_fbhive)

add_library(velox_functions_remote_server RemoteFunctionService.cpp)

target_link_libraries(
velox_functions_remote_server
PUBLIC remote_function_thrift
velox_functions_remote_get_serde
velox_functions_remote_utils
velox_type_fbhive
PUBLIC remote_function_thrift velox_functions_remote_get_serde
velox_functions_remote_server_base)

add_executable(velox_functions_remote_server_main RemoteFunctionServiceMain.cpp)
Expand All @@ -35,15 +31,11 @@ target_link_libraries(

add_library(velox_functions_remote_server_rest RemoteFunctionRestService.cpp)
target_link_libraries(
velox_functions_remote_server_rest
PUBLIC velox_functions_remote_get_serde
velox_functions_remote_utils
velox_type_fbhive
velox_functions_prestosql
velox_functions_remote_server_base)
velox_functions_remote_server_rest velox_presto_serializer
velox_functions_remote_server_base)

add_executable(velox_functions_remote_server_rest_main
RemoteFunctionRestServiceMain.cpp)
target_link_libraries(
velox_functions_remote_server_rest_main
PRIVATE velox_functions_remote_server_rest)
velox_functions_remote_server_rest_main velox_functions_remote_server_rest
velox_functions_prestosql)
63 changes: 57 additions & 6 deletions velox/functions/remote/server/RemoteFunctionBaseService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,78 @@

#include "velox/functions/remote/server/RemoteFunctionBaseService.h"

#include "velox/type/fbhive/HiveTypeParser.h"

namespace facebook::velox::functions {
namespace {

RowVectorPtr RemoteFunctionBaseService::invokeFunctionInternal(
const RowVectorPtr& inputVector,
inline std::string getFunctionName(
const std::string& prefix,
const std::string& functionName) {
return prefix.empty() ? functionName
: fmt::format("{}.{}", prefix, functionName);
}

inline TypePtr deserializeType(const std::string& input) {
// Use hive type parser/serializer.
return type::fbhive::HiveTypeParser().parse(input);
}

inline RowTypePtr deserializeArgTypes(
const std::vector<std::string>& argTypes) {
const size_t argCount = argTypes.size();

std::vector<TypePtr> argumentTypes;
std::vector<std::string> typeNames;
argumentTypes.reserve(argCount);
typeNames.reserve(argCount);

for (size_t i = 0; i < argCount; ++i) {
argumentTypes.emplace_back(deserializeType(argTypes[i]));
typeNames.emplace_back(fmt::format("c{}", i));
}
return ROW(std::move(typeNames), std::move(argumentTypes));
}

inline std::vector<core::TypedExprPtr> getExpressions(
const RowTypePtr& inputType,
const TypePtr& outputType,
const TypePtr& returnType,
const std::string& functionName) {
std::vector<core::TypedExprPtr> inputs;
for (size_t i = 0; i < inputType->size(); ++i) {
inputs.push_back(std::make_shared<core::FieldAccessTypedExpr>(
inputType->childAt(i), inputType->nameOf(i)));
}

return {std::make_shared<core::CallTypedExpr>(
returnType, std::move(inputs), functionName)};
}

} // namespace

RowVectorPtr RemoteFunctionBaseService::invokeFunctionInternal(
const folly::IOBuf& payload,
const std::vector<std::string>& argTypeNames,
const std::string& returnTypeName,
const std::string& functionName,
bool throwOnError) {
bool throwOnError,
VectorSerde* serde) {
auto inputType = deserializeArgTypes(argTypeNames);
auto outputType = deserializeType(returnTypeName);

auto inputVector = IOBufToRowVector(payload, inputType, *pool_, serde);

const vector_size_t numRows = inputVector->size();
SelectivityVector rows{numRows};

queryCtx_ = core::QueryCtx::create();
execCtx_ = std::make_unique<core::ExecCtx>(pool_.get(), queryCtx_.get());

exec::ExprSet exprSet{
getExpressions(
inputType,
outputType,
getFunctionName(functionPrefix_, functionName)),
execCtx_.get()};

evalCtx_ = std::make_unique<exec::EvalCtx>(
execCtx_.get(), &exprSet, inputVector.get());
*evalCtx_->mutableThrowOnError() = throwOnError;
Expand Down
10 changes: 5 additions & 5 deletions velox/functions/remote/server/RemoteFunctionBaseService.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <memory>
#include "velox/common/memory/Memory.h"
#include "velox/expression/Expr.h"
#include "velox/functions/remote/utils/RemoteFunctionHelper.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::functions {
Expand All @@ -38,11 +37,12 @@ class RemoteFunctionBaseService {
}

RowVectorPtr invokeFunctionInternal(
const RowVectorPtr& inputVector,
const RowTypePtr& inputType,
const TypePtr& outputType,
const folly::IOBuf& payload,
const std::vector<std::string>& argTypeNames,
const std::string& returnTypeName,
const std::string& functionName,
bool throwOnError);
bool throwOnError,
VectorSerde* serde);

exec::EvalErrors* getEvalErrors_() {
return evalCtx_ ? evalCtx_->errors() : nullptr;
Expand Down
17 changes: 7 additions & 10 deletions velox/functions/remote/server/RemoteFunctionRestService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
#include "velox/functions/remote/server/RemoteFunctionRestService.h"

#include <boost/beast/version.hpp>
#include <serializers/PrestoSerializer.h>
#include "velox/expression/Expr.h"
#include "velox/functions/remote/utils/RemoteFunctionHelper.h"
#include "velox/vector/VectorStream.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::functions {

Expand Down Expand Up @@ -132,16 +129,16 @@ void RestSession::handleRequest(
const auto& functionSignature =
internalFunctionSignatureMap.at(functionName);

auto inputType = deserializeArgTypes(functionSignature.argumentTypes);
auto returnType = deserializeType(functionSignature.returnType);

serializer::presto::PrestoVectorSerde serde;
auto inputBuffer = folly::IOBuf::copyBuffer(req.body());
auto inputVector =
IOBufToRowVector(*inputBuffer, inputType, *pool_, &serde);

auto outputRowVector = invokeFunctionInternal(
inputVector, inputType, returnType, functionName, true);
*inputBuffer,
functionSignature.argumentTypes,
functionSignature.returnType,
functionName,
true,
&serde);

auto payload = rowVectorToIOBuf(
outputRowVector, outputRowVector->size(), *pool_, &serde);
Expand Down
1 change: 0 additions & 1 deletion velox/functions/remote/server/RemoteFunctionRestService.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include "velox/common/memory/Memory.h"
#include "velox/functions/remote/server/RemoteFunctionBaseService.h"

namespace facebook::velox::functions {
Expand Down
20 changes: 5 additions & 15 deletions velox/functions/remote/server/RemoteFunctionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

#include "velox/functions/remote/server/RemoteFunctionService.h"
#include "velox/common/base/Exceptions.h"
#include "velox/expression/Expr.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/utils/RemoteFunctionHelper.h"
#include "velox/type/fbhive/HiveTypeParser.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::functions {

Expand Down Expand Up @@ -69,22 +65,16 @@ void RemoteFunctionServiceHandler::invokeFunction(
const auto& functionHandle = request->get_remoteFunctionHandle();
const auto& inputs = request->get_inputs();

// Deserialize types and data.
auto inputType = deserializeArgTypes(functionHandle.get_argumentTypes());
auto outputType = deserializeType(functionHandle.get_returnType());

auto serdeFormat = inputs.get_pageFormat();
auto serde = getSerde(serdeFormat);

auto inputVector =
IOBufToRowVector(inputs.get_payload(), inputType, *pool_, serde.get());

auto outputRowVector = invokeFunctionInternal(
inputVector,
inputType,
outputType,
inputs.get_payload(),
functionHandle.get_argumentTypes(),
functionHandle.get_returnType(),
functionHandle.get_name(),
request->get_throwOnError());
request->get_throwOnError(),
serde.get());

auto result = response.result_ref();
result->rowCount_ref() = outputRowVector->size();
Expand Down
2 changes: 0 additions & 2 deletions velox/functions/remote/server/RemoteFunctionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
#pragma once

#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "velox/common/memory/Memory.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionService.h"
#include "velox/functions/remote/server/RemoteFunctionBaseService.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::exec {
class EvalErrors;
Expand Down
3 changes: 1 addition & 2 deletions velox/functions/remote/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_functions_remote_utils RemoteFunctionHelper.cpp
RemoteFunctionServiceProvider.cpp)
add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp)

target_link_libraries(
velox_functions_remote_utils
Expand Down
61 changes: 0 additions & 61 deletions velox/functions/remote/utils/RemoteFunctionHelper.cpp

This file was deleted.

39 changes: 0 additions & 39 deletions velox/functions/remote/utils/RemoteFunctionHelper.h

This file was deleted.

0 comments on commit e05d54a

Please sign in to comment.