Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support compress UnsafeRow and CompactRow #11497

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,38 @@

namespace facebook::velox::exec {

namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
return options;
}
} // namespace

Exchange::Exchange(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType)
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
options_{getVectorSerdeOptions(serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
std::shuffle(std::begin(taskIds), std::end(taskIds), rng_);
for (const std::string& taskId : taskIds) {
Expand Down Expand Up @@ -127,7 +159,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
resultOffset = result_->size();
}
}
Expand All @@ -154,7 +186,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
// We expect the row-wise deserialization to consume all the input into one
// output vector.
VELOX_CHECK(inputStream->atEnd());
Expand Down
40 changes: 14 additions & 26 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,7 @@ class Exchange : public SourceOperator {
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType = "Exchange")
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}
const std::string& operatorType = "Exchange");

~Exchange() override {
close();
Expand All @@ -76,16 +62,16 @@ class Exchange : public SourceOperator {
private:
// Invoked to create exchange client for remote tasks.
// The function shuffles the source task ids first to randomize the source
// tasks we fetch data from. This helps to avoid different tasks fetching from
// the same source task in a distributed system.
// tasks we fetch data from. This helps to avoid different tasks fetching
// from the same source task in a distributed system.
void addTaskIds(std::vector<std::string>& taskIds);

/// Fetches splits from the task until there are no more splits or task
/// returns a future that will be complete when more splits arrive. Adds
/// splits to exchangeClient_. Returns true if received a future from the task
/// and sets the 'future' parameter. Returns false if fetched all splits or if
/// this operator is not the first operator in the pipeline and therefore is
/// not responsible for fetching splits and adding them to the
/// splits to exchangeClient_. Returns true if received a future from the
/// task and sets the 'future' parameter. Returns false if fetched all
/// splits or if this operator is not the first operator in the pipeline and
/// therefore is not responsible for fetching splits and adding them to the
/// exchangeClient_.
bool getSplits(ContinueFuture* future);

Expand All @@ -97,16 +83,19 @@ class Exchange : public SourceOperator {

const VectorSerde::Kind serdeKind_;

/// True if this operator is responsible for fetching splits from the Task and
/// passing these to ExchangeClient.
const std::unique_ptr<VectorSerde::Options> options_;

/// True if this operator is responsible for fetching splits from the Task
/// and passing these to ExchangeClient.
const bool processSplits_;

bool noMoreSplits_ = false;

std::shared_ptr<ExchangeClient> exchangeClient_;

/// A future received from Task::getSplitOrFuture(). It will be complete when
/// there are more splits available or no-more-splits signal has arrived.
/// A future received from Task::getSplitOrFuture(). It will be complete
/// when there are more splits available or no-more-splits signal has
/// arrived.
ContinueFuture splitFuture_{ContinueFuture::makeEmpty()};

// Reusable result vector.
Expand All @@ -115,7 +104,6 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
};

} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class OperatorTraceInputReader {
const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{
true,
common::CompressionKind_ZSTD, // TODO: Use trace config.
0.8,
/*_nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
const RowTypePtr dataType_;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OperatorTraceInputWriter {
const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = {
true,
common::CompressionKind::CompressionKind_ZSTD,
0.8,
/*nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
Expand Down
46 changes: 36 additions & 10 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,39 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if () {
} else {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ternary conditional operator can get the VectorSerde::Options, then set the field of options, otherwise, we must set in each branch.

kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
return options;
}
} // namespace

namespace detail {
Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(options),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}

BlockingReason Destination::advance(
uint64_t maxBytes,
const std::vector<vector_size_t>& sizes,
Expand Down Expand Up @@ -57,15 +89,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
if (serde_->kind() == VectorSerde::Kind::kPresto) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
} else {
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->createStreamTree(rowType, rowsInCurrent_, options_);
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down Expand Up @@ -175,7 +199,8 @@ PartitionedOutput::PartitionedOutput(
->queryConfig()
.maxPartitionedOutputBufferSize()),
eagerFlush_(eagerFlush),
serde_(getNamedVectorSerde(planNode->serdeKind())) {
serde_(getNamedVectorSerde(planNode->serdeKind())),
options_(getVectorSerdeOptions(planNode->serdeKind())) {
if (!planNode->isPartitioned()) {
VELOX_USER_CHECK_EQ(numDestinations_, 1);
}
Expand Down Expand Up @@ -231,6 +256,7 @@ void PartitionedOutput::initializeDestinations() {
taskId,
i,
serde_,
options_.get(),
pool(),
eagerFlush_,
[&](uint64_t bytes, uint64_t rows) {
Expand Down
13 changes: 4 additions & 9 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,10 @@ class Destination {
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued);

/// Resets the destination before starting a new batch.
void beginBatch() {
Expand Down Expand Up @@ -112,6 +105,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
VectorSerde::Options* const options_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down Expand Up @@ -226,6 +220,7 @@ class PartitionedOutput : public Operator {
const int64_t maxBufferedBytes_;
const bool eagerFlush_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ uint64_t SpillWriter::write(
NanosecondTimer timer(&timeNs);
if (batch_ == nullptr) {
serializer::presto::PrestoVectorSerde::PrestoOptions options = {
kDefaultUseLosslessTimestamp, compressionKind_, true /*nullsFirst*/};
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
/*nullsFirst=*/true};
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
batch_->createStreamTree(
std::static_pointer_cast<const RowType>(rows->type()),
Expand Down Expand Up @@ -300,6 +303,7 @@ SpillReadFile::SpillReadFile(
readOptions_{
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
/*nullsFirst=*/true},
pool_(pool),
serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto)),
Expand Down
10 changes: 3 additions & 7 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2408,22 +2408,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
} else if (GetParam() == VectorSerde::Kind::kCompactRow) {
test(1, 1'000);
test(1'000, 28);
test(10'000, 3);
test(1'000, 38);
test(10'000, 4);
test(100'000, 1);
} else {
test(1, 1'000);
test(1'000, 63);
test(1'000, 72);
test(10'000, 7);
test(100'000, 1);
}
}

TEST_P(MultiFragmentTest, compression) {
// NOTE: only presto format supports compression for now
if (GetParam() != VectorSerde::Kind::kPresto) {
return;
}
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
Expand Down
15 changes: 9 additions & 6 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ using TRowSize = uint32_t;

class CompactRowVectorSerializer : public RowSerializer<row::CompactRow> {
public:
explicit CompactRowVectorSerializer(memory::MemoryPool* pool)
: RowSerializer<row::CompactRow>(pool) {}
explicit CompactRowVectorSerializer(
memory::MemoryPool* pool,
const VectorSerde::Options* options)
: RowSerializer<row::CompactRow>(pool, options) {}

private:
void serializeRanges(
Expand Down Expand Up @@ -74,20 +76,21 @@ CompactRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
const Options* /* options */) {
return std::make_unique<CompactRowVectorSerializer>(streamArena->pool());
const Options* options) {
return std::make_unique<CompactRowVectorSerializer>(
streamArena->pool(), options);
}

void CompactRowVectorSerde::deserialize(
ByteInputStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result,
const Options* /* options */) {
const Options* options) {
std::vector<std::string_view> serializedRows;
std::vector<std::unique_ptr<std::string>> serializedBuffers;
RowDeserializer<std::string_view>::deserialize(
source, serializedRows, serializedBuffers);
source, serializedRows, serializedBuffers, options);

if (serializedRows.empty()) {
*result = BaseVector::create<RowVector>(type, 0, pool);
Expand Down
15 changes: 0 additions & 15 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4230,21 +4230,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}

private:
struct CompressionStats {
// Number of times compression was not attempted.
int32_t numCompressionSkipped{0};

// uncompressed size for which compression was attempted.
int64_t compressionInputBytes{0};

// Compressed bytes.
int64_t compressedBytes{0};

// Bytes for which compression was not attempted because of past
// non-performance.
int64_t compressionSkippedBytes{0};
};

const SerdeOpts opts_;
StreamArena* const streamArena_;
const std::unique_ptr<folly::io::Codec> codec_;
Expand Down
8 changes: 2 additions & 6 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class PrestoVectorSerde : public VectorSerde {
PrestoOptions(
bool _useLosslessTimestamp,
common::CompressionKind _compressionKind,
float _minCompressionRatio = 0.8,
bool _nullsFirst = false,
bool _preserveEncodings = false)
: VectorSerde::Options(_compressionKind),
: VectorSerde::Options(_compressionKind, _minCompressionRatio),
useLosslessTimestamp(_useLosslessTimestamp),
nullsFirst(_nullsFirst),
preserveEncodings(_preserveEncodings) {}
Expand All @@ -74,11 +75,6 @@ class PrestoVectorSerde : public VectorSerde {
/// structs.
bool nullsFirst{false};

/// Minimum achieved compression if compression is enabled. Compressing less
/// than this causes subsequent compression attempts to be skipped. The more
/// times compression misses the target the less frequently it is tried.
float minCompressionRatio{0.8};

/// If true, the serializer will not employ any optimizations that can
/// affect the encoding of the input vectors. This is only relevant when
/// using BatchVectorSerializer.
Expand Down
Loading
Loading