Skip to content

Commit

Permalink
Merge request apache#26 from bag:routine_load_bug
Browse files Browse the repository at this point in the history
[fix][routine-load] fix bug that routine load cannot cancel task when append_data return error (apache#8457)
  • Loading branch information
Henry2SS committed Mar 14, 2022
2 parents 4357752 + 3855f08 commit ae373fe
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
29 changes: 12 additions & 17 deletions be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {

MonotonicStopWatch watch;
watch.start();
Status st;
bool eos = false;
while (true) {
if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
Expand All @@ -142,25 +141,15 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// waiting all threads finished
_thread_pool.shutdown();
_thread_pool.join();

if (!result_st.ok()) {
// some of consumers encounter errors, cancel this task
kafka_pipe->cancel(result_st.get_error_msg());
return result_st;
}

if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, we have to cancel it, because
// we do not allow finishing stream load pipe without data
kafka_pipe->cancel("no data");
return Status::Cancelled("Cancelled");
} else {
DCHECK(left_bytes < ctx->max_batch_size);
DCHECK(left_rows < ctx->max_batch_rows);
kafka_pipe->finish();
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
return Status::OK();
}
kafka_pipe->finish();
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
return Status::OK();
}

RdKafka::Message* msg;
Expand All @@ -170,7 +159,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
<< ", partition: " << msg->partition() << ", offset: " << msg->offset()
<< ", len: " << msg->len();

(kafka_pipe.get()->*append_data)(static_cast<const char*>(msg->payload()),
Status st = (kafka_pipe.get()->*append_data)(static_cast<const char*>(msg->payload()),
static_cast<size_t>(msg->len()));
if (st.ok()) {
left_rows--;
Expand All @@ -182,6 +171,12 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// failed to append this msg, we must stop
LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
eos = true;
{
std::unique_lock<std::mutex> lock(_mutex);
if (result_st.ok()) {
result_st = st;
}
}
}
delete msg;
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/routine_load/kafka_consumer_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ Status KafkaConsumerPipe::append_avro_bytes(const char* data, size_t size) {
try {
d->init(*json_in);
avro::decode(*d, jdw_data);
return append_map(jdw_data);
} catch (avro::Exception& e) {
return Status::InternalError(std::string("avro message deserialize error : ") + e.what());
}
return append_map(jdw_data);
}

Status KafkaConsumerPipe::append_map(const doris::JdwData& jdw_data) {
Expand Down

0 comments on commit ae373fe

Please sign in to comment.