Skip to content

Commit

Permalink
[native]support different vector serde in shuffle
Browse files Browse the repository at this point in the history
Advance velox version and update Prestissimo code to support configurable shuffle vector serde
format
Fix timing related flakiness in PrestoExchangeSourceTest.slowProducerAndEarlyTerminatingConsumer
  • Loading branch information
xiaoxmeng committed Nov 11, 2024
1 parent 12652e4 commit 14b56d8
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 95 deletions.
11 changes: 11 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
#include "presto_cpp/main/RemoteFunctionRegisterer.h"
Expand Down Expand Up @@ -1275,6 +1277,15 @@ void PrestoServer::registerVectorSerdes() {
if (!velox::isRegisteredVectorSerde()) {
velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) {
velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) {
velox::serializer::CompactRowVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) {
velox::serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde();
}
}

void PrestoServer::registerFileSinks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ class ShuffleReadOperator : public Exchange {
public:
ShuffleReadOperator(
int32_t operatorId,
DriverCtx* FOLLY_NONNULL ctx,
DriverCtx* ctx,
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
std::shared_ptr<ExchangeClient> exchangeClient)
: Exchange(
operatorId,
ctx,
std::make_shared<core::ExchangeNode>(
shuffleReadNode->id(),
shuffleReadNode->outputType()),
shuffleReadNode->outputType(),
velox::VectorSerde::Kind::kCompactRow),
exchangeClient,
"ShuffleRead"),
serde_(std::make_unique<velox::serializer::CompactRowVectorSerde>()) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ShuffleReadNode : public velox::core::PlanNode {
// Nothing to add
}

velox::RowTypePtr outputType_;
const velox::RowTypePtr outputType_;
};

class ShuffleReadTranslator : public velox::exec::Operator::PlanNodeTranslator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
#include <boost/algorithm/string/join.hpp>
#include <folly/Uri.h>
#include "folly/init/Init.h"
#include "presto_cpp/external/json/nlohmann/json.hpp"
#include "presto_cpp/main/operators/BroadcastExchangeSource.h"
#include "presto_cpp/main/operators/BroadcastWrite.h"
#include "presto_cpp/main/operators/tests/PlanBuilder.h"
Expand Down Expand Up @@ -106,7 +104,9 @@ class BroadcastTest : public exec::test::OperatorTestBase {
const std::string& basePath,
const std::vector<std::string>& broadcastFilePaths) {
// Create plan for read node using file path.
auto readerPlan = exec::test::PlanBuilder().exchange(dataType).planNode();
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

Expand Down Expand Up @@ -211,7 +211,12 @@ class BroadcastTest : public exec::test::OperatorTestBase {
auto byteStream = std::make_unique<BufferInputStream>(std::move(ranges));

RowVectorPtr result;
VectorStreamGroup::read(byteStream.get(), pool(), dataType, &result);
VectorStreamGroup::read(
byteStream.get(),
pool(),
dataType,
velox::getNamedVectorSerde(velox::VectorSerde::Kind::kPresto),
&result);
return result;
}
};
Expand Down Expand Up @@ -352,7 +357,9 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) {
std::string basePath = "/tmp";
std::string invalidBroadcastFilePath = "/tmp/file.bin";

auto readerPlan = exec::test::PlanBuilder().exchange(dataType).planNode();
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ DEBUG_ONLY_TEST_P(
std::function<void(const PrestoExchangeSource*)>(
([&](const auto* prestoExchangeSource) {
allCloseCheckPassed = true;
closeWait.notifyAll();
})));
SCOPED_TESTVALUE_SET(
"facebook::presto::PrestoExchangeSource::handleDataResponse",
Expand Down Expand Up @@ -739,8 +740,9 @@ DEBUG_ONLY_TEST_P(
// all resources have been cleaned up, so explicitly waiting is the only way
// to allow the execution of background processing. We expect the test to not
// crash.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_TRUE(codePointHit);
while (!codePointHit) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
serverWrapper.stop();
}

Expand Down
Loading

0 comments on commit 14b56d8

Please sign in to comment.