From e05d54a0abbf411c69228010a16d8ff730d5398f Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Tue, 11 Feb 2025 13:48:36 +0530 Subject: [PATCH] Optimise remote server --- velox/functions/remote/server/CMakeLists.txt | 20 ++---- .../server/RemoteFunctionBaseService.cpp | 63 +++++++++++++++++-- .../remote/server/RemoteFunctionBaseService.h | 10 +-- .../server/RemoteFunctionRestService.cpp | 17 +++-- .../remote/server/RemoteFunctionRestService.h | 1 - ....cpp => RemoteFunctionRestServiceMain.cpp} | 0 .../remote/server/RemoteFunctionService.cpp | 20 ++---- .../remote/server/RemoteFunctionService.h | 2 - velox/functions/remote/utils/CMakeLists.txt | 3 +- .../remote/utils/RemoteFunctionHelper.cpp | 61 ------------------ .../remote/utils/RemoteFunctionHelper.h | 39 ------------ 11 files changed, 81 insertions(+), 155 deletions(-) rename velox/functions/remote/server/{remoteFunctionRestServiceMain.cpp => RemoteFunctionRestServiceMain.cpp} (100%) delete mode 100644 velox/functions/remote/utils/RemoteFunctionHelper.cpp delete mode 100644 velox/functions/remote/utils/RemoteFunctionHelper.h diff --git a/velox/functions/remote/server/CMakeLists.txt b/velox/functions/remote/server/CMakeLists.txt index ebbaff9098a4..a8a546b279d6 100644 --- a/velox/functions/remote/server/CMakeLists.txt +++ b/velox/functions/remote/server/CMakeLists.txt @@ -14,17 +14,13 @@ add_library(velox_functions_remote_server_base RemoteFunctionBaseService.cpp) target_link_libraries( - velox_functions_remote_server_base - PUBLIC velox_memory velox_expression) + velox_functions_remote_server_base velox_expression velox_type_fbhive) add_library(velox_functions_remote_server RemoteFunctionService.cpp) target_link_libraries( velox_functions_remote_server - PUBLIC remote_function_thrift - velox_functions_remote_get_serde - velox_functions_remote_utils - velox_type_fbhive + PUBLIC remote_function_thrift velox_functions_remote_get_serde velox_functions_remote_server_base) add_executable(velox_functions_remote_server_main RemoteFunctionServiceMain.cpp) @@ -35,15 +31,11 @@ target_link_libraries( add_library(velox_functions_remote_server_rest RemoteFunctionRestService.cpp) target_link_libraries( - velox_functions_remote_server_rest - PUBLIC velox_functions_remote_get_serde - velox_functions_remote_utils - velox_type_fbhive - velox_functions_prestosql - velox_functions_remote_server_base) + velox_functions_remote_server_rest velox_presto_serializer + velox_functions_remote_server_base) add_executable(velox_functions_remote_server_rest_main RemoteFunctionRestServiceMain.cpp) target_link_libraries( - velox_functions_remote_server_rest_main - PRIVATE velox_functions_remote_server_rest) + velox_functions_remote_server_rest_main velox_functions_remote_server_rest + velox_functions_prestosql) diff --git a/velox/functions/remote/server/RemoteFunctionBaseService.cpp b/velox/functions/remote/server/RemoteFunctionBaseService.cpp index ebf9e8910ad0..69fb5e4a9bbd 100644 --- a/velox/functions/remote/server/RemoteFunctionBaseService.cpp +++ b/velox/functions/remote/server/RemoteFunctionBaseService.cpp @@ -16,27 +16,78 @@ #include "velox/functions/remote/server/RemoteFunctionBaseService.h" +#include "velox/type/fbhive/HiveTypeParser.h" + namespace facebook::velox::functions { +namespace { -RowVectorPtr RemoteFunctionBaseService::invokeFunctionInternal( - const RowVectorPtr& inputVector, +inline std::string getFunctionName( + const std::string& prefix, + const std::string& functionName) { + return prefix.empty() ? functionName + : fmt::format("{}.{}", prefix, functionName); +} + +inline TypePtr deserializeType(const std::string& input) { + // Use hive type parser/serializer. + return type::fbhive::HiveTypeParser().parse(input); +} + +inline RowTypePtr deserializeArgTypes( + const std::vector& argTypes) { + const size_t argCount = argTypes.size(); + + std::vector argumentTypes; + std::vector typeNames; + argumentTypes.reserve(argCount); + typeNames.reserve(argCount); + + for (size_t i = 0; i < argCount; ++i) { + argumentTypes.emplace_back(deserializeType(argTypes[i])); + typeNames.emplace_back(fmt::format("c{}", i)); + } + return ROW(std::move(typeNames), std::move(argumentTypes)); +} + +inline std::vector getExpressions( const RowTypePtr& inputType, - const TypePtr& outputType, + const TypePtr& returnType, + const std::string& functionName) { + std::vector inputs; + for (size_t i = 0; i < inputType->size(); ++i) { + inputs.push_back(std::make_shared( + inputType->childAt(i), inputType->nameOf(i))); + } + + return {std::make_shared( + returnType, std::move(inputs), functionName)}; +} + +} // namespace + +RowVectorPtr RemoteFunctionBaseService::invokeFunctionInternal( + const folly::IOBuf& payload, + const std::vector& argTypeNames, + const std::string& returnTypeName, const std::string& functionName, - bool throwOnError) { + bool throwOnError, + VectorSerde* serde) { + auto inputType = deserializeArgTypes(argTypeNames); + auto outputType = deserializeType(returnTypeName); + + auto inputVector = IOBufToRowVector(payload, inputType, *pool_, serde); + const vector_size_t numRows = inputVector->size(); SelectivityVector rows{numRows}; queryCtx_ = core::QueryCtx::create(); execCtx_ = std::make_unique(pool_.get(), queryCtx_.get()); - exec::ExprSet exprSet{ getExpressions( inputType, outputType, getFunctionName(functionPrefix_, functionName)), execCtx_.get()}; - evalCtx_ = std::make_unique( execCtx_.get(), &exprSet, inputVector.get()); *evalCtx_->mutableThrowOnError() = throwOnError; diff --git a/velox/functions/remote/server/RemoteFunctionBaseService.h b/velox/functions/remote/server/RemoteFunctionBaseService.h index 2fbdbb7fd2ed..a4a126ebd3e8 100644 --- a/velox/functions/remote/server/RemoteFunctionBaseService.h +++ b/velox/functions/remote/server/RemoteFunctionBaseService.h @@ -18,7 +18,6 @@ #include #include "velox/common/memory/Memory.h" #include "velox/expression/Expr.h" -#include "velox/functions/remote/utils/RemoteFunctionHelper.h" #include "velox/vector/VectorStream.h" namespace facebook::velox::functions { @@ -38,11 +37,12 @@ class RemoteFunctionBaseService { } RowVectorPtr invokeFunctionInternal( - const RowVectorPtr& inputVector, - const RowTypePtr& inputType, - const TypePtr& outputType, + const folly::IOBuf& payload, + const std::vector& argTypeNames, + const std::string& returnTypeName, const std::string& functionName, - bool throwOnError); + bool throwOnError, + VectorSerde* serde); exec::EvalErrors* getEvalErrors_() { return evalCtx_ ? evalCtx_->errors() : nullptr; diff --git a/velox/functions/remote/server/RemoteFunctionRestService.cpp b/velox/functions/remote/server/RemoteFunctionRestService.cpp index fcfb0a57b096..e7f4c392d790 100644 --- a/velox/functions/remote/server/RemoteFunctionRestService.cpp +++ b/velox/functions/remote/server/RemoteFunctionRestService.cpp @@ -17,10 +17,7 @@ #include "velox/functions/remote/server/RemoteFunctionRestService.h" #include -#include -#include "velox/expression/Expr.h" -#include "velox/functions/remote/utils/RemoteFunctionHelper.h" -#include "velox/vector/VectorStream.h" +#include "velox/serializers/PrestoSerializer.h" namespace facebook::velox::functions { @@ -132,16 +129,16 @@ void RestSession::handleRequest( const auto& functionSignature = internalFunctionSignatureMap.at(functionName); - auto inputType = deserializeArgTypes(functionSignature.argumentTypes); - auto returnType = deserializeType(functionSignature.returnType); - serializer::presto::PrestoVectorSerde serde; auto inputBuffer = folly::IOBuf::copyBuffer(req.body()); - auto inputVector = - IOBufToRowVector(*inputBuffer, inputType, *pool_, &serde); auto outputRowVector = invokeFunctionInternal( - inputVector, inputType, returnType, functionName, true); + *inputBuffer, + functionSignature.argumentTypes, + functionSignature.returnType, + functionName, + true, + &serde); auto payload = rowVectorToIOBuf( outputRowVector, outputRowVector->size(), *pool_, &serde); diff --git a/velox/functions/remote/server/RemoteFunctionRestService.h b/velox/functions/remote/server/RemoteFunctionRestService.h index e429119e3e2a..14a344b1ff72 100644 --- a/velox/functions/remote/server/RemoteFunctionRestService.h +++ b/velox/functions/remote/server/RemoteFunctionRestService.h @@ -19,7 +19,6 @@ #include #include #include -#include "velox/common/memory/Memory.h" #include "velox/functions/remote/server/RemoteFunctionBaseService.h" namespace facebook::velox::functions { diff --git a/velox/functions/remote/server/remoteFunctionRestServiceMain.cpp b/velox/functions/remote/server/RemoteFunctionRestServiceMain.cpp similarity index 100% rename from velox/functions/remote/server/remoteFunctionRestServiceMain.cpp rename to velox/functions/remote/server/RemoteFunctionRestServiceMain.cpp diff --git a/velox/functions/remote/server/RemoteFunctionService.cpp b/velox/functions/remote/server/RemoteFunctionService.cpp index 14760227df27..ba5d3cebdbe7 100644 --- a/velox/functions/remote/server/RemoteFunctionService.cpp +++ b/velox/functions/remote/server/RemoteFunctionService.cpp @@ -16,11 +16,7 @@ #include "velox/functions/remote/server/RemoteFunctionService.h" #include "velox/common/base/Exceptions.h" -#include "velox/expression/Expr.h" #include "velox/functions/remote/if/GetSerde.h" -#include "velox/functions/remote/utils/RemoteFunctionHelper.h" -#include "velox/type/fbhive/HiveTypeParser.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::functions { @@ -69,22 +65,16 @@ void RemoteFunctionServiceHandler::invokeFunction( const auto& functionHandle = request->get_remoteFunctionHandle(); const auto& inputs = request->get_inputs(); - // Deserialize types and data. - auto inputType = deserializeArgTypes(functionHandle.get_argumentTypes()); - auto outputType = deserializeType(functionHandle.get_returnType()); - auto serdeFormat = inputs.get_pageFormat(); auto serde = getSerde(serdeFormat); - auto inputVector = - IOBufToRowVector(inputs.get_payload(), inputType, *pool_, serde.get()); - auto outputRowVector = invokeFunctionInternal( - inputVector, - inputType, - outputType, + inputs.get_payload(), + functionHandle.get_argumentTypes(), + functionHandle.get_returnType(), functionHandle.get_name(), - request->get_throwOnError()); + request->get_throwOnError(), + serde.get()); auto result = response.result_ref(); result->rowCount_ref() = outputRowVector->size(); diff --git a/velox/functions/remote/server/RemoteFunctionService.h b/velox/functions/remote/server/RemoteFunctionService.h index 40fcd665dfee..937c45287333 100644 --- a/velox/functions/remote/server/RemoteFunctionService.h +++ b/velox/functions/remote/server/RemoteFunctionService.h @@ -17,10 +17,8 @@ #pragma once #include -#include "velox/common/memory/Memory.h" #include "velox/functions/remote/if/gen-cpp2/RemoteFunctionService.h" #include "velox/functions/remote/server/RemoteFunctionBaseService.h" -#include "velox/vector/VectorStream.h" namespace facebook::velox::exec { class EvalErrors; diff --git a/velox/functions/remote/utils/CMakeLists.txt b/velox/functions/remote/utils/CMakeLists.txt index 6e033822554d..51192297cf0c 100644 --- a/velox/functions/remote/utils/CMakeLists.txt +++ b/velox/functions/remote/utils/CMakeLists.txt @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_functions_remote_utils RemoteFunctionHelper.cpp - RemoteFunctionServiceProvider.cpp) +add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp) target_link_libraries( velox_functions_remote_utils diff --git a/velox/functions/remote/utils/RemoteFunctionHelper.cpp b/velox/functions/remote/utils/RemoteFunctionHelper.cpp deleted file mode 100644 index 31851cc35744..000000000000 --- a/velox/functions/remote/utils/RemoteFunctionHelper.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/functions/remote/utils/RemoteFunctionHelper.h" - -namespace facebook::velox::functions { -std::string getFunctionName( - const std::string& prefix, - const std::string& functionName) { - return prefix.empty() ? functionName - : fmt::format("{}.{}", prefix, functionName); -} - -TypePtr deserializeType(const std::string& input) { - // Use hive type parser/serializer. - return type::fbhive::HiveTypeParser().parse(input); -} - -RowTypePtr deserializeArgTypes(const std::vector& argTypes) { - const size_t argCount = argTypes.size(); - - std::vector argumentTypes; - std::vector typeNames; - argumentTypes.reserve(argCount); - typeNames.reserve(argCount); - - for (size_t i = 0; i < argCount; ++i) { - argumentTypes.emplace_back(deserializeType(argTypes[i])); - typeNames.emplace_back(fmt::format("c{}", i)); - } - return ROW(std::move(typeNames), std::move(argumentTypes)); -} - -std::vector getExpressions( - const RowTypePtr& inputType, - const TypePtr& returnType, - const std::string& functionName) { - std::vector inputs; - for (size_t i = 0; i < inputType->size(); ++i) { - inputs.push_back(std::make_shared( - inputType->childAt(i), inputType->nameOf(i))); - } - - return {std::make_shared( - returnType, std::move(inputs), functionName)}; -} - -} // namespace facebook::velox::functions diff --git a/velox/functions/remote/utils/RemoteFunctionHelper.h b/velox/functions/remote/utils/RemoteFunctionHelper.h deleted file mode 100644 index cb6af2f7a34d..000000000000 --- a/velox/functions/remote/utils/RemoteFunctionHelper.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include "velox/expression/Expr.h" -#include "velox/type/Type.h" -#include "velox/type/fbhive/HiveTypeParser.h" - -namespace facebook::velox::functions { -std::string getFunctionName( - const std::string& prefix, - const std::string& functionName); - -TypePtr deserializeType(const std::string& input); - -RowTypePtr deserializeArgTypes(const std::vector& argTypes); - -std::vector getExpressions( - const RowTypePtr& inputType, - const TypePtr& returnType, - const std::string& functionName); - -} // namespace facebook::velox::functions