Skip to content

Commit

Permalink
add proxygen
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Sep 6, 2024
1 parent 69cfcf1 commit 75ccdfe
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ projects/*
!projects/*.*
!projects/Makefile
.venv
deps-install
deps-download

#==============================================================================#
# Autotools artifacts
Expand Down
37 changes: 21 additions & 16 deletions scripts/setup-macos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ PYTHON_VENV=${PYHTON_VENV:-"${SCRIPTDIR}/../.venv"}
NPROC=$(getconf _NPROCESSORS_ONLN)

DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)}
MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz xsimd zstd"
MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz zstd"
MACOS_VELOX_DEPS="bison flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 snappy xz zstd"
MACOS_BUILD_DEPS="ninja cmake"
FB_OS_VERSION="v2024.05.20.00"
FMT_VERSION="10.1.1"
XSIMD_VERSION="10.0.0"

function update_brew {
DEFAULT_BREW_PATH=/usr/local/bin/brew
if [ `arch` == "arm64" ] ;
then
DEFAULT_BREW_PATH=$(which brew) ;
if [ "$(arch)" == "arm64" ]; then
DEFAULT_BREW_PATH=$(which brew)
fi
BREW_PATH=${BREW_PATH:-$DEFAULT_BREW_PATH}
$BREW_PATH update --auto-update --verbose
Expand All @@ -53,25 +54,20 @@ function update_brew {

function install_from_brew {
pkg=$1
if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]];
then
if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]]; then
pkg=${BASH_REMATCH[1]}
ver=${BASH_REMATCH[2]}
echo "Installing '${pkg}' at '${ver}'"
tap="velox/local-${pkg}"
brew tap-new "${tap}"
brew extract "--version=${ver}" "${pkg}" "${tap}"
brew install "${tap}/${pkg}@${ver}" || ( echo "Failed to install ${tap}/${pkg}@${ver}" ; exit 1 )
brew install "${tap}/${pkg}@${ver}" || { echo "Failed to install ${tap}/${pkg}@${ver}"; exit 1; }
else
( brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful" || brew upgrade --formula "$pkg" ) || ( echo "Failed to install ${pkg}" ; exit 1 )
(brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful") || brew upgrade --formula "${pkg}" || { echo "Failed to install ${pkg}"; exit 1; }
fi
}

function install_build_prerequisites {
for pkg in ${MACOS_BUILD_DEPS}
do
install_from_brew ${pkg}
done
if [ ! -f ${PYTHON_VENV}/pyvenv.cfg ]; then
echo "Creating Python Virtual Environment at ${PYTHON_VENV}"
python3 -m venv ${PYTHON_VENV}
Expand All @@ -82,9 +78,13 @@ function install_build_prerequisites {
mv ccache-4.10.2-darwin/ccache /usr/local/bin/
}

function install_xsimd {
wget_and_untar https://github.com/xtensor-stack/xsimd/archive/refs/tags/${XSIMD_VERSION}.tar.gz xsimd
cmake_install xsimd
}

function install_velox_deps_from_brew {
for pkg in ${MACOS_VELOX_DEPS}
do
for pkg in ${MACOS_VELOX_DEPS}; do
install_from_brew ${pkg}
done
}
Expand All @@ -94,6 +94,11 @@ function install_fmt {
cmake_install fmt -DFMT_TEST=OFF
}

function install_proxygen {
wget_and_untar https://github.com/facebook/proxygen/archive/refs/tags/${FB_OS_VERSION}.tar.gz proxygen
cmake_install proxygen -DBUILD_TESTS=OFF
}

function install_folly {
wget_and_untar https://github.com/facebook/folly/archive/refs/tags/${FB_OS_VERSION}.tar.gz folly
cmake_install folly -DBUILD_TESTS=OFF -DFOLLY_HAVE_INT128_T=ON
Expand Down Expand Up @@ -135,7 +140,6 @@ function install_re2 {
}

function install_velox_deps {
run_and_time install_velox_deps_from_brew
run_and_time install_ranges_v3
run_and_time install_double_conversion
run_and_time install_re2
Expand All @@ -145,12 +149,13 @@ function install_velox_deps {
run_and_time install_wangle
run_and_time install_mvfst
run_and_time install_fbthrift
run_and_time install_xsimd
run_and_time install_proxygen
}

(return 2> /dev/null) && return # If script was sourced, don't run commands.

(
update_brew
if [[ $# -ne 0 ]]; then
for cmd in "$@"; do
run_and_time "${cmd}"
Expand Down
12 changes: 12 additions & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,22 @@

if(NOT DEFINED PROXYGEN_LIBRARIES)
find_package(Sodium REQUIRED)

find_library(PROXYGEN proxygen)
find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver)
find_library(FIZZ fizz)
find_library(WANGLE wangle)

if(NOT PROXYGEN
OR NOT PROXYGEN_HTTP_SERVER
OR NOT FIZZ
OR NOT WANGLE)
message(
FATAL_ERROR
"One or more proxygen libraries were not found. Please ensure proxygen, proxygenhttpserver, fizz, and wangle are installed."
)
endif()

set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ})
endif()

Expand Down
34 changes: 24 additions & 10 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class RemoteFunction : public exec::VectorFunction {
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
serdeFormat_(metadata.serdeFormat),
serde_(getSerde(serdeFormat_)) {
metadata_(metadata),
serde_(getSerde(metadata_.serdeFormat)) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
Expand Down Expand Up @@ -108,7 +108,7 @@ class RemoteFunction : public exec::VectorFunction {
rows.end(),
std::move(args));

/// construct json request
// Construct JSON request
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
Expand All @@ -118,8 +118,8 @@ class RemoteFunction : public exec::VectorFunction {
}

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(serdeFormat_);
// use existing serializer(Prestopage or Sparkunsaferow)
inputs["pageFormat"] = static_cast<int>(metadata_.serdeFormat);
// Use existing serializer (PrestoPage or SparkUnsafeRow)
inputs["payload"] = iobufToString(rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get()));
inputs["rowCount"] = remoteRowVector->size();
Expand All @@ -129,11 +129,24 @@ class RemoteFunction : public exec::VectorFunction {
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// call Rest client to send request
// URL format -
// {endpoint}/v1/functions/{schema}/{functionName}/{functionId}/{version}
std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
metadata_.schema.value_or("default_schema"),
functionName_,
metadata_.functionId.value_or("default_function_id"),
metadata_.version.value_or("default_version"));

// Set the full URL on the REST client.
restClient_->setUrl(fullUrl);

// Call Rest client to send request
restClient_->invoke_function(folly::toJson(jsonObject), responseBody);
LOG(INFO) << responseBody;

// parse json response
// Parse JSON response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_NYI(responseJsonObj["err"].asString());
Expand All @@ -142,7 +155,7 @@ class RemoteFunction : public exec::VectorFunction {
auto payloadIObuf = folly::IOBuf::copyBuffer(
responseJsonObj["result"]["payload"].asString());

// use existing deserializer(Prestopage or Sparkunsaferow)
// Use existing deserializer (PrestoPage or SparkUnsafeRow)
auto outputRowVector = IOBufToRowVector(
*payloadIObuf, ROW({outputType}), *context.pool(), serde_.get());
result = outputRowVector->childAt(0);
Expand Down Expand Up @@ -182,7 +195,7 @@ class RemoteFunction : public exec::VectorFunction {

auto requestInputs = request.inputs_ref();
requestInputs->rowCount_ref() = remoteRowVector->size();
requestInputs->pageFormat_ref() = serdeFormat_;
requestInputs->pageFormat_ref() = metadata_.serdeFormat;

// TODO: serialize only active rows.
requestInputs->payload_ref() = rowVectorToIOBuf(
Expand Down Expand Up @@ -215,12 +228,13 @@ class RemoteFunction : public exec::VectorFunction {
std::unique_ptr<RestClient> restClient_;
proxygen::URL url_;

remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde> serde_;

// Structures we construct once to cache:
RowTypePtr remoteInputType_;
std::vector<std::string> serializedInputTypes_;

const RemoteVectorFunctionMetadata metadata_;
};

std::shared_ptr<exec::VectorFunction> createRemoteFunction(
Expand Down
21 changes: 18 additions & 3 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,23 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// SocketAddress::makeFromPath()).
boost::variant<folly::SocketAddress, proxygen::URL> location;

/// The serialization format to be used
/// The serialization format to be used when sending data to the remote.
remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE};

/// Optional schema defining the structure of the data or input/output types
/// involved in the remote function. This may include details such as column
/// names and data types.
std::optional<std::string> schema;

/// Optional identifier for the specific remote function to be invoked.
/// This can be useful when the same server hosts multiple functions,
/// and the client needs to specify which function to call.
std::optional<std::string> functionId;

/// Optional version information to be used when calling the remote function.
/// This can help in ensuring compatibility with a particular version of the
/// function if multiple versions are available on the server.
std::optional<std::string> version;
};

/// Registers a new remote function. It will use the meatadata defined in
Expand All @@ -41,8 +56,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
//
/// Remote functions are registered as regular statufull functions (using the
/// same internal catalog), and hence conflict if there already exists a
/// (non-remote) function registered with the same name. The `overwrite` flag
/// controls whether to overwrite in these cases.
/// (non-remote) function registered with the same name. The `overwrite`
/// flagwrite controls whether to overwrite in these cases.
void registerRemoteFunction(
const std::string& name,
std::vector<exec::FunctionSignaturePtr> signatures,
Expand Down

0 comments on commit 75ccdfe

Please sign in to comment.