Skip to content

Commit

Permalink
[native] Add batch read exchange source creation
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Dec 7, 2022
1 parent 7d22bc0 commit b111eaa
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "presto_cpp/main/http/HttpServer.h"
#include "presto_cpp/main/operators/LocalPersistentShuffle.h"
#include "presto_cpp/main/operators/ShuffleInterface.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/presto_protocol/Connectors.h"
#include "presto_cpp/presto_protocol/WriteProtocol.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
Expand Down Expand Up @@ -242,6 +243,8 @@ void PrestoServer::run() {

facebook::velox::exec::ExchangeSource::registerFactory(
PrestoExchangeSource::createExchangeSource);
facebook::velox::exec::ExchangeSource::registerFactory(
operators::UnsafeRowExchangeSource::createExchangeSource);

velox::dwrf::registerDwrfReaderFactory();
#ifdef PRESTO_ENABLE_PARQUET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fmt/format.h>
#include <folly/Uri.h>

#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"

namespace facebook::presto::operators {
Expand All @@ -36,4 +40,45 @@ void UnsafeRowExchangeSource::request() {
std::move(ioBuf), pool_, [buffer](auto&) {}));
}

namespace {
std::optional<std::string> getSerializedShuffleInfo(folly::Uri& uri) {
for (auto& pair : uri.getQueryParams()) {
if (pair.first == "shuffleInfo") {
return std::make_optional(pair.second);
}
}
return std::nullopt;
}
} // namespace

// static
std::unique_ptr<velox::exec::ExchangeSource>
UnsafeRowExchangeSource::createExchangeSource(
const std::string& url,
int32_t destination,
std::shared_ptr<velox::exec::ExchangeQueue> queue,
velox::memory::MemoryPool* FOLLY_NONNULL pool) {
if (::strncmp(url.c_str(), "batch://", 8) != 0) {
return nullptr;
}
auto shuffleName = SystemConfig::instance()->shuffleName();
VELOX_CHECK(
!shuffleName.empty(),
"shuffle.name is not provided in config.properties to create a shuffle "
"interface.");
auto& shuffleFactory = ShuffleInterface::factory(shuffleName);
auto uri = folly::Uri(url);
auto serializedShuffleInfo = getSerializedShuffleInfo(uri);
VELOX_USER_CHECK(
serializedShuffleInfo.has_value(),
"Cannot find shuffleInfo parameter in split url '{}'",
url);
return std::make_unique<UnsafeRowExchangeSource>(
uri.host(),
destination,
std::move(queue),
shuffleFactory(
serializedShuffleInfo.value(), ShuffleInterface::Type::kRead, pool),
pool);
}
}; // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {

void close() override {}

/// url needs to follow below format:
/// batch://<taskid>?shuffleInfo=<serialized-shuffle-info>
static std::unique_ptr<velox::exec::ExchangeSource> createExchangeSource(
const std::string& url,
int32_t destination,
std::shared_ptr<velox::exec::ExchangeQueue> queue,
velox::memory::MemoryPool* FOLLY_NONNULL pool);

private:
const std::shared_ptr<ShuffleInterface> shuffle_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void registerExchangeSource(const std::shared_ptr<ShuffleInterface>& shuffle) {
std::shared_ptr<exec::ExchangeQueue> queue,
memory::MemoryPool* FOLLY_NONNULL pool)
-> std::unique_ptr<exec::ExchangeSource> {
if (strncmp(taskId.c_str(), "spark://", 8) == 0) {
if (strncmp(taskId.c_str(), "batch://", 8) == 0) {
return std::make_unique<UnsafeRowExchangeSource>(
taskId, destination, std::move(queue), shuffle, pool);
}
Expand Down Expand Up @@ -189,7 +189,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
}

static std::string makeTaskId(const std::string& prefix, int num) {
return fmt::format("spark://{}-{}", prefix, num);
return fmt::format("batch://{}-{}", prefix, num);
}

std::shared_ptr<exec::Task> makeTask(
Expand Down

0 comments on commit b111eaa

Please sign in to comment.