Skip to content

Commit

Permalink
[feature](agg) Support spill to disk in aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Mar 25, 2023
1 parent dc4b719 commit f2eb71c
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 41 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/block_spill_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status BlockSpillManager::get_reader(int64_t stream_id, vectorized::BlockSpillRe
std::string path;
{
std::lock_guard<std::mutex> l(lock_);
DCHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id));
CHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id));
path = id_to_file_paths_[stream_id];
}
reader.reset(new vectorized::BlockSpillReader(stream_id, path, profile, delete_after_read));
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ class RuntimeState {
return 0;
}

int64_t external_agg_bytes_threshold() const {
return _query_options.__isset.external_agg_bytes_threshold
? _query_options.external_agg_bytes_threshold
: 0;
}

private:
Status create_error_log_file();

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/common/hash_table/fixed_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class FixedHashTable : private boost::noncopyable,
free();

std::swap(buf, rhs.buf);
this->setSize(rhs.size());
this->set_size(rhs.size());

Allocator::operator=(std::move(rhs));
Cell::State::operator=(std::move(rhs));
Expand Down
11 changes: 11 additions & 0 deletions be/src/vec/common/hash_table/string_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,19 @@ class StringHashTable : private boost::noncopyable {
m3(std::move(rhs.m3)),
ms(std::move(rhs.ms)) {}

StringHashTable& operator=(StringHashTable&& other) {
std::swap(m0, other.m0);
std::swap(m1, other.m1);
std::swap(m2, other.m2);
std::swap(m3, other.m3);
std::swap(ms, other.ms);
return *this;
}

~StringHashTable() = default;

size_t hash(doris::StringRef key) { return StringHashTableHash()(key); }

// Dispatch is written in a way that maximizes the performance:
// 1. Always memcpy 8 times bytes
// 2. Use switch case extension to generate fast dispatching table
Expand Down
32 changes: 23 additions & 9 deletions be/src/vec/core/block_spill_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ namespace vectorized {
void BlockSpillReader::_init_profile() {
read_time_ = ADD_TIMER(profile_, "ReadTime");
deserialize_time_ = ADD_TIMER(profile_, "DeserializeTime");
read_bytes_ = ADD_COUNTER(profile_, "ReadBytes", TUnit::BYTES);
read_block_num_ = ADD_COUNTER(profile_, "ReadBlockNum", TUnit::UNIT);
}

Status BlockSpillReader::open() {
Expand Down Expand Up @@ -76,6 +78,12 @@ Status BlockSpillReader::open() {
return Status::OK();
}

void BlockSpillReader::seek(size_t block_index) {
CHECK(file_reader_);
CHECK_LT(block_index, block_count_);
read_block_index_ = block_index;
}

// The returned block is owned by BlockSpillReader and is
// destroyed when reading next block.
Status BlockSpillReader::read(Block* block, bool* eos) {
Expand All @@ -99,17 +107,23 @@ Status BlockSpillReader::read(Block* block, bool* eos) {
&bytes_read));
}
DCHECK(bytes_read == bytes_to_read);

PBlock pb_block;
BlockUPtr new_block = nullptr;
{
SCOPED_TIMER(deserialize_time_);
if (!pb_block.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
COUNTER_UPDATE(read_bytes_, bytes_read);
COUNTER_UPDATE(read_block_num_, 1);

if (bytes_read > 0) {
PBlock pb_block;
BlockUPtr new_block = nullptr;
{
SCOPED_TIMER(deserialize_time_);
if (!pb_block.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
}
new_block.reset(new Block(pb_block));
}
new_block.reset(new Block(pb_block));
block->swap(*new_block);
} else {
block->clear_column_data();
}
block->swap(*new_block);

++read_block_index_;

Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/core/block_spill_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ class BlockSpillReader {

Status read(Block* block, bool* eos);

void seek(size_t block_index);

int64_t get_id() const { return stream_id_; }

std::string get_path() const { return file_path_; }

size_t block_count() const { return block_count_; }

private:
void _init_profile();

Expand All @@ -63,6 +67,8 @@ class BlockSpillReader {
RuntimeProfile* profile_ = nullptr;
RuntimeProfile::Counter* read_time_;
RuntimeProfile::Counter* deserialize_time_;
RuntimeProfile::Counter* read_bytes_;
RuntimeProfile::Counter* read_block_num_;
};

using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>;
Expand Down
43 changes: 22 additions & 21 deletions be/src/vec/core/block_spill_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ void BlockSpillWriter::_init_profile() {
write_bytes_counter_ = ADD_COUNTER(profile_, "WriteBytes", TUnit::BYTES);
write_timer_ = ADD_TIMER(profile_, "WriteTime");
serialize_timer_ = ADD_TIMER(profile_, "SerializeTime");
write_blocks_num_ = ADD_COUNTER(profile_, "WriteBlockNum", TUnit::UNIT);
}

Status BlockSpillWriter::open() {
Expand Down Expand Up @@ -67,10 +68,6 @@ Status BlockSpillWriter::close() {

Status BlockSpillWriter::write(const Block& block) {
auto rows = block.rows();
if (0 == rows) {
return Status::OK();
}

// file format: block1, block2, ..., blockn, meta
if (rows <= batch_size_) {
return _write_internal(block);
Expand Down Expand Up @@ -107,33 +104,37 @@ Status BlockSpillWriter::_write_internal(const Block& block) {
Status status;
std::string buff;

PBlock pblock;
{
SCOPED_TIMER(serialize_timer_);
status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
&uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::LZ4);
if (block.rows() > 0) {
PBlock pblock;
{
SCOPED_TIMER(serialize_timer_);
status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
&uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::LZ4);
if (!status.ok()) {
unlink(file_path_.c_str());
return status;
}
pblock.SerializeToString(&buff);
}

{
SCOPED_TIMER(write_timer_);
status = file_writer_->append(buff);
written_bytes = buff.size();
}

if (!status.ok()) {
unlink(file_path_.c_str());
return status;
}
pblock.SerializeToString(&buff);
}

{
SCOPED_TIMER(write_timer_);
status = file_writer_->append(buff);
written_bytes = buff.size();
}
if (!status.ok()) {
unlink(file_path_.c_str());
return status;
}

max_sub_block_size_ = std::max(max_sub_block_size_, written_bytes);

meta_.append((const char*)&total_written_bytes_, sizeof(size_t));
COUNTER_UPDATE(write_bytes_counter_, written_bytes);
COUNTER_UPDATE(write_blocks_num_, 1);
total_written_bytes_ += written_bytes;
++written_blocks_;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/core/block_spill_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class BlockSpillWriter {
RuntimeProfile::Counter* write_bytes_counter_;
RuntimeProfile::Counter* serialize_timer_;
RuntimeProfile::Counter* write_timer_;
RuntimeProfile::Counter* write_blocks_num_;
};

using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>;
Expand Down
Loading

0 comments on commit f2eb71c

Please sign in to comment.