Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Opt](load) don't print stack when some errors occur for stream load #38332

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
return Status::InternalError<false>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
Expand Down Expand Up @@ -232,13 +232,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError("no valid Basic authorization");
return Status::InternalError<false>("no valid Basic authorization");
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
return Status::InternalError<false>("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
Expand All @@ -254,8 +254,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
return Status::InternalError<false>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
}

// check content length
Expand All @@ -273,16 +273,16 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError(
return Status::InternalError<false>(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
return Status::InternalError<false>("body exceed max size: {}, data: {}",
csv_max_body_bytes, ctx->body_bytes);
}
} else {
#ifndef BE_TEST
Expand All @@ -300,13 +300,14 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError(
return Status::InternalError<false>(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError("please do not set both content_length and transfer-encoding");
return Status::InternalError<false>(
"please do not set both content_length and transfer-encoding");
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
Expand Down Expand Up @@ -727,7 +728,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
return Status::InternalError<false>(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Expand All @@ -740,7 +742,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError<false>(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
Expand All @@ -761,7 +763,8 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
return Status::InternalError<false>(
"label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down
Loading