Skip to content

Commit

Permalink
[Opt](load) don't print stack when some errors occur for stream load (a…
Browse files Browse the repository at this point in the history
…pache#38332)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
liaoxin01 committed Jul 26, 2024
1 parent ee65195 commit 7cc4317
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
return Status::InternalError<false>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
Expand Down Expand Up @@ -230,13 +230,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError("no valid Basic authorization");
return Status::InternalError<false>("no valid Basic authorization");
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
return Status::InternalError<false>("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
Expand All @@ -252,8 +252,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
return Status::InternalError<false>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
}

// check content length
Expand All @@ -271,16 +271,16 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError(
return Status::InternalError<false>(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
return Status::InternalError<false>("body exceed max size: {}, data: {}",
csv_max_body_bytes, ctx->body_bytes);
}
} else {
#ifndef BE_TEST
Expand All @@ -298,13 +298,14 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError(
return Status::InternalError<false>(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError("please do not set both content_length and transfer-encoding");
return Status::InternalError<false>(
"please do not set both content_length and transfer-encoding");
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
Expand Down Expand Up @@ -716,7 +717,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
return Status::InternalError<false>(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Expand All @@ -729,7 +731,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError<false>(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
Expand All @@ -750,7 +752,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
return Status::InternalError<false>(
"label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down

0 comments on commit 7cc4317

Please sign in to comment.