Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](sink) The issue with 2GB limit of protocol buffer #37990

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <limits>
#include <ostream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -80,6 +81,13 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
result->set_packet_seq(packet_seq);
result->set_eos(eos);
}

/// The size limit of proto buffer message is 2G
if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2G should be a config.
Set it to 1G.

st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong());
result->clear_row_batch();
result->set_empty_batch(true);
}
st.to_protobuf(result->mutable_status());
{ done->Run(); }
delete this;
Expand Down
58 changes: 44 additions & 14 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "gutil/integral_types.h"
#include "olap/hll.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -140,23 +141,11 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* state, Block& block) {
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
auto num_rows = block.rows();
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);
uint64_t bytes_sent = 0;
{
Expand Down Expand Up @@ -249,6 +238,47 @@ Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
return status;
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
const auto total_bytes = block.bytes();

if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
const auto total_rows = block.rows();
const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) /
config::thrift_max_message_size;
const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count;

size_t offset = 0;
while (offset < total_rows) {
size_t rows = std::min(sub_block_rows, total_rows - offset);
auto sub_block = block.clone_empty();
for (size_t i = 0; i != block.columns(); ++i) {
sub_block.get_by_position(i).column =
block.get_by_position(i).column->cut(offset, rows);
}
offset += rows;

RETURN_IF_ERROR(_write_one_block(state, sub_block));
}
return Status::OK();
}

return _write_one_block(state, block);
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::close(Status) {
COUNTER_SET(_sent_rows_counter, _written_rows);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter {
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale = -1);

Status _write_one_block(RuntimeState* state, Block& block);

BufferControlBlock* _sinker = nullptr;

const VExprContextSPtrs& _output_vexpr_ctxs;
Expand Down
Loading