diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 845afb9a84b85c..6f8022a00342a3 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -24,6 +24,7 @@ #include // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include #include @@ -80,6 +81,13 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul result->set_packet_seq(packet_seq); result->set_eos(eos); } + + /// The size limit of proto buffer message is 2G + if (result->ByteSizeLong() > std::numeric_limits::max()) { + st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); + result->clear_row_batch(); + result->set_empty_batch(true); + } st.to_protobuf(result->mutable_status()); { done->Run(); } delete this; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index d700d43165d27a..7fcc7fcf76f604 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -30,6 +30,7 @@ #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "gutil/integral_types.h" #include "olap/hll.h" #include "runtime/buffer_control_block.h" @@ -140,23 +141,11 @@ Status VMysqlResultWriter::_set_options( } template -Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { - SCOPED_TIMER(_append_row_batch_timer); +Status VMysqlResultWriter::_write_one_block(RuntimeState* state, Block& block) { Status status = Status::OK(); - if (UNLIKELY(input_block.rows() == 0)) { - return status; - } - - DCHECK(_output_vexpr_ctxs.empty() != true); - - // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec - // failed, just return the error status - Block block; - RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, - input_block, &block)); + auto num_rows = block.rows(); // convert one batch auto result = std::make_unique(); - auto num_rows = block.rows(); result->result_batch.rows.resize(num_rows); uint64_t bytes_sent = 0; { @@ -249,6 +238,47 @@ Status VMysqlResultWriter::write(RuntimeState* state, Block& i return status; } +template +Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { + SCOPED_TIMER(_append_row_batch_timer); + Status status = Status::OK(); + if (UNLIKELY(input_block.rows() == 0)) { + return status; + } + + DCHECK(_output_vexpr_ctxs.empty() != true); + + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec + // failed, just return the error status + Block block; + RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, + input_block, &block)); + const auto total_bytes = block.bytes(); + + if (total_bytes > config::thrift_max_message_size) [[unlikely]] { + const auto total_rows = block.rows(); + const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) / + config::thrift_max_message_size; + const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count; + + size_t offset = 0; + while (offset < total_rows) { + size_t rows = std::min(sub_block_rows, total_rows - offset); + auto sub_block = block.clone_empty(); + for (size_t i = 0; i != block.columns(); ++i) { + sub_block.get_by_position(i).column = + block.get_by_position(i).column->cut(offset, rows); + } + offset += rows; + + RETURN_IF_ERROR(_write_one_block(state, sub_block)); + } + return Status::OK(); + } + + return _write_one_block(state, block); +} + template Status VMysqlResultWriter::close(Status) { COUNTER_SET(_sent_rows_counter, _written_rows); diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index 1b165ecb74830f..b89b8cf1b9086a 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter { int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type, MysqlRowBuffer& buffer, int scale = -1); + Status _write_one_block(RuntimeState* state, Block& block); + BufferControlBlock* _sinker = nullptr; const VExprContextSPtrs& _output_vexpr_ctxs;