From 75ccdfed2fed13106bed17d4d6498bdb92291e9c Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Tue, 3 Sep 2024 09:06:06 +0530 Subject: [PATCH] add proxygen --- .gitignore | 2 ++ scripts/setup-macos.sh | 37 ++++++++++++++---------- velox/functions/remote/CMakeLists.txt | 12 ++++++++ velox/functions/remote/client/Remote.cpp | 34 +++++++++++++++------- velox/functions/remote/client/Remote.h | 21 ++++++++++++-- 5 files changed, 77 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 913a149b9edac..f2b64bee7a554 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,8 @@ projects/* !projects/*.* !projects/Makefile .venv +deps-install +deps-download #==============================================================================# # Autotools artifacts diff --git a/scripts/setup-macos.sh b/scripts/setup-macos.sh index fd5e61012cda2..587a4f5ef61d5 100755 --- a/scripts/setup-macos.sh +++ b/scripts/setup-macos.sh @@ -35,16 +35,17 @@ PYTHON_VENV=${PYHTON_VENV:-"${SCRIPTDIR}/../.venv"} NPROC=$(getconf _NPROCESSORS_ONLN) DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)} -MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz xsimd zstd" +MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz zstd" +MACOS_VELOX_DEPS="bison flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 snappy xz zstd" MACOS_BUILD_DEPS="ninja cmake" FB_OS_VERSION="v2024.05.20.00" FMT_VERSION="10.1.1" +XSIMD_VERSION="10.0.0" function update_brew { DEFAULT_BREW_PATH=/usr/local/bin/brew - if [ `arch` == "arm64" ] ; - then - DEFAULT_BREW_PATH=$(which brew) ; + if [ "$(arch)" == "arm64" ]; then + DEFAULT_BREW_PATH=$(which brew) fi BREW_PATH=${BREW_PATH:-$DEFAULT_BREW_PATH} $BREW_PATH update --auto-update --verbose @@ -53,25 +54,20 @@ function update_brew { function install_from_brew { pkg=$1 - if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]]; - then + if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]]; then pkg=${BASH_REMATCH[1]} ver=${BASH_REMATCH[2]} echo "Installing '${pkg}' at '${ver}'" tap="velox/local-${pkg}" brew tap-new "${tap}" brew extract "--version=${ver}" "${pkg}" "${tap}" - brew install "${tap}/${pkg}@${ver}" || ( echo "Failed to install ${tap}/${pkg}@${ver}" ; exit 1 ) + brew install "${tap}/${pkg}@${ver}" || { echo "Failed to install ${tap}/${pkg}@${ver}"; exit 1; } else - ( brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful" || brew upgrade --formula "$pkg" ) || ( echo "Failed to install ${pkg}" ; exit 1 ) + (brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful") || brew upgrade --formula "${pkg}" || { echo "Failed to install ${pkg}"; exit 1; } fi } function install_build_prerequisites { - for pkg in ${MACOS_BUILD_DEPS} - do - install_from_brew ${pkg} - done if [ ! -f ${PYTHON_VENV}/pyvenv.cfg ]; then echo "Creating Python Virtual Environment at ${PYTHON_VENV}" python3 -m venv ${PYTHON_VENV} @@ -82,9 +78,13 @@ function install_build_prerequisites { mv ccache-4.10.2-darwin/ccache /usr/local/bin/ } +function install_xsimd { + wget_and_untar https://github.com/xtensor-stack/xsimd/archive/refs/tags/${XSIMD_VERSION}.tar.gz xsimd + cmake_install xsimd +} + function install_velox_deps_from_brew { - for pkg in ${MACOS_VELOX_DEPS} - do + for pkg in ${MACOS_VELOX_DEPS}; do install_from_brew ${pkg} done } @@ -94,6 +94,11 @@ function install_fmt { cmake_install fmt -DFMT_TEST=OFF } +function install_proxygen { + wget_and_untar https://github.com/facebook/proxygen/archive/refs/tags/${FB_OS_VERSION}.tar.gz proxygen + cmake_install proxygen -DBUILD_TESTS=OFF +} + function install_folly { wget_and_untar https://github.com/facebook/folly/archive/refs/tags/${FB_OS_VERSION}.tar.gz folly cmake_install folly -DBUILD_TESTS=OFF -DFOLLY_HAVE_INT128_T=ON @@ -135,7 +140,6 @@ function install_re2 { } function install_velox_deps { - run_and_time install_velox_deps_from_brew run_and_time install_ranges_v3 run_and_time install_double_conversion run_and_time install_re2 @@ -145,12 +149,13 @@ function install_velox_deps { run_and_time install_wangle run_and_time install_mvfst run_and_time install_fbthrift + run_and_time install_xsimd + run_and_time install_proxygen } (return 2> /dev/null) && return # If script was sourced, don't run commands. ( - update_brew if [[ $# -ne 0 ]]; then for cmd in "$@"; do run_and_time "${cmd}" diff --git a/velox/functions/remote/CMakeLists.txt b/velox/functions/remote/CMakeLists.txt index a38c65894a2ff..c5f32ca662cb4 100644 --- a/velox/functions/remote/CMakeLists.txt +++ b/velox/functions/remote/CMakeLists.txt @@ -14,10 +14,22 @@ if(NOT DEFINED PROXYGEN_LIBRARIES) find_package(Sodium REQUIRED) + find_library(PROXYGEN proxygen) find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver) find_library(FIZZ fizz) find_library(WANGLE wangle) + + if(NOT PROXYGEN + OR NOT PROXYGEN_HTTP_SERVER + OR NOT FIZZ + OR NOT WANGLE) + message( + FATAL_ERROR + "One or more proxygen libraries were not found. Please ensure proxygen, proxygenhttpserver, fizz, and wangle are installed." + ) + endif() + set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ}) endif() diff --git a/velox/functions/remote/client/Remote.cpp b/velox/functions/remote/client/Remote.cpp index 7614a9dec662b..4f5aeafa1555d 100644 --- a/velox/functions/remote/client/Remote.cpp +++ b/velox/functions/remote/client/Remote.cpp @@ -52,8 +52,8 @@ class RemoteFunction : public exec::VectorFunction { const std::vector& inputArgs, const RemoteVectorFunctionMetadata& metadata) : functionName_(functionName), - serdeFormat_(metadata.serdeFormat), - serde_(getSerde(serdeFormat_)) { + metadata_(metadata), + serde_(getSerde(metadata_.serdeFormat)) { if (metadata.location.type() == typeid(SocketAddress)) { location_ = boost::get(metadata.location); thriftClient_ = getThriftClient(location_, &eventBase_); @@ -108,7 +108,7 @@ class RemoteFunction : public exec::VectorFunction { rows.end(), std::move(args)); - /// construct json request + // Construct JSON request folly::dynamic remoteFunctionHandle = folly::dynamic::object; remoteFunctionHandle["functionName"] = functionName_; remoteFunctionHandle["returnType"] = serializeType(outputType); @@ -118,8 +118,8 @@ class RemoteFunction : public exec::VectorFunction { } folly::dynamic inputs = folly::dynamic::object; - inputs["pageFormat"] = static_cast(serdeFormat_); - // use existing serializer(Prestopage or Sparkunsaferow) + inputs["pageFormat"] = static_cast(metadata_.serdeFormat); + // Use existing serializer (PrestoPage or SparkUnsafeRow) inputs["payload"] = iobufToString(rowVectorToIOBuf( remoteRowVector, rows.end(), *context.pool(), serde_.get())); inputs["rowCount"] = remoteRowVector->size(); @@ -129,11 +129,24 @@ class RemoteFunction : public exec::VectorFunction { jsonObject["inputs"] = inputs; jsonObject["throwOnError"] = context.throwOnError(); - // call Rest client to send request + // URL format - + // {endpoint}/v1/functions/{schema}/{functionName}/{functionId}/{version} + std::string fullUrl = fmt::format( + "{}/v1/functions/{}/{}/{}/{}", + url_.getUrl(), + metadata_.schema.value_or("default_schema"), + functionName_, + metadata_.functionId.value_or("default_function_id"), + metadata_.version.value_or("default_version")); + + // Set the full URL on the REST client. + restClient_->setUrl(fullUrl); + + // Call Rest client to send request restClient_->invoke_function(folly::toJson(jsonObject), responseBody); LOG(INFO) << responseBody; - // parse json response + // Parse JSON response auto responseJsonObj = parseJson(responseBody); if (responseJsonObj.count("err") > 0) { VELOX_NYI(responseJsonObj["err"].asString()); @@ -142,7 +155,7 @@ class RemoteFunction : public exec::VectorFunction { auto payloadIObuf = folly::IOBuf::copyBuffer( responseJsonObj["result"]["payload"].asString()); - // use existing deserializer(Prestopage or Sparkunsaferow) + // Use existing deserializer (PrestoPage or SparkUnsafeRow) auto outputRowVector = IOBufToRowVector( *payloadIObuf, ROW({outputType}), *context.pool(), serde_.get()); result = outputRowVector->childAt(0); @@ -182,7 +195,7 @@ class RemoteFunction : public exec::VectorFunction { auto requestInputs = request.inputs_ref(); requestInputs->rowCount_ref() = remoteRowVector->size(); - requestInputs->pageFormat_ref() = serdeFormat_; + requestInputs->pageFormat_ref() = metadata_.serdeFormat; // TODO: serialize only active rows. requestInputs->payload_ref() = rowVectorToIOBuf( @@ -215,12 +228,13 @@ class RemoteFunction : public exec::VectorFunction { std::unique_ptr restClient_; proxygen::URL url_; - remote::PageFormat serdeFormat_; std::unique_ptr serde_; // Structures we construct once to cache: RowTypePtr remoteInputType_; std::vector serializedInputTypes_; + + const RemoteVectorFunctionMetadata metadata_; }; std::shared_ptr createRemoteFunction( diff --git a/velox/functions/remote/client/Remote.h b/velox/functions/remote/client/Remote.h index fd90009da457f..88b5544c172be 100644 --- a/velox/functions/remote/client/Remote.h +++ b/velox/functions/remote/client/Remote.h @@ -31,8 +31,23 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { /// SocketAddress::makeFromPath()). boost::variant location; - /// The serialization format to be used + /// The serialization format to be used when sending data to the remote. remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE}; + + /// Optional schema defining the structure of the data or input/output types + /// involved in the remote function. This may include details such as column + /// names and data types. + std::optional schema; + + /// Optional identifier for the specific remote function to be invoked. + /// This can be useful when the same server hosts multiple functions, + /// and the client needs to specify which function to call. + std::optional functionId; + + /// Optional version information to be used when calling the remote function. + /// This can help in ensuring compatibility with a particular version of the + /// function if multiple versions are available on the server. + std::optional version; }; /// Registers a new remote function. It will use the meatadata defined in @@ -41,8 +56,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { // /// Remote functions are registered as regular statufull functions (using the /// same internal catalog), and hence conflict if there already exists a -/// (non-remote) function registered with the same name. The `overwrite` flag -/// controls whether to overwrite in these cases. +/// (non-remote) function registered with the same name. The `overwrite` +/// flagwrite controls whether to overwrite in these cases. void registerRemoteFunction( const std::string& name, std::vector signatures,