Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Introduce TxnStateCache for merge commit sync mode #55001

Merged
merged 13 commits into from
Jan 16, 2025
22 changes: 13 additions & 9 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1526,15 +1526,19 @@ CONF_mInt32(apply_version_slow_log_sec, "30");
CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500");
// The maximum number of bytes that the merge commit stream load pipe can buffer.
CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824");
CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
CONF_mInt32(batch_write_default_timeout_ms, "600000");
CONF_mInt32(batch_write_rpc_request_retry_num, "10");
CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500");
CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");
CONF_Int32(merge_commit_thread_pool_num_min, "0");
CONF_Int32(merge_commit_thread_pool_num_max, "512");
CONF_Int32(merge_commit_thread_pool_queue_size, "4096");
CONF_mInt32(merge_commit_default_timeout_ms, "600000");
CONF_mInt32(merge_commit_rpc_request_retry_num, "10");
CONF_mInt32(merge_commit_rpc_request_retry_interval_ms, "500");
CONF_mInt32(merge_commit_rpc_reqeust_timeout_ms, "10000");
CONF_mBool(merge_commit_trace_log_enable, "false");
CONF_mInt32(merge_commit_txn_state_cache_capacity, "4096");
CONF_mInt32(merge_commit_txn_state_clean_interval_sec, "300");
CONF_mInt32(merge_commit_txn_state_expire_time_sec, "1800");
CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000");
CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2");

CONF_mBool(enable_load_spill, "false");
// Max chunk bytes which allow to spill per flush. Default is 10MB.
Expand Down
10 changes: 10 additions & 0 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "storage/compaction_manager.h"
#include "storage/lake/compaction_scheduler.h"
#include "storage/lake/load_spill_block_manager.h"
Expand Down Expand Up @@ -327,6 +329,14 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
_config_callback.emplace("load_spill_max_merge_bytes", [&]() -> Status {
return StorageEngine::instance()->load_spill_block_merge_executor()->refresh_max_thread_num();
});
_config_callback.emplace("merge_commit_txn_state_cache_capacity", [&]() -> Status {
LOG(INFO) << "set merge_commit_txn_state_cache_capacity: " << config::merge_commit_txn_state_cache_capacity;
auto batch_write_mgr = _exec_env->batch_write_mgr();
if (batch_write_mgr) {
batch_write_mgr->txn_state_cache()->set_capacity(config::merge_commit_txn_state_cache_capacity);
}
return Status::OK();
});

#ifdef USE_STAROS
#define UPDATE_STARLET_CONFIG(BE_CONFIG, STARLET_CONFIG) \
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(RUNTIME_FILES
batch_write/isomorphic_batch_write.cpp
batch_write/batch_write_mgr.cpp
batch_write/batch_write_util.cpp
batch_write/txn_state_cache.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
Expand Down
55 changes: 53 additions & 2 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@

namespace starrocks {

BatchWriteMgr::BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)) {}

Status BatchWriteMgr::init() {
std::unique_ptr<ThreadPoolToken> token =
_executor->get_thread_pool()->new_token(ThreadPool::ExecutionMode::CONCURRENT);
_txn_state_cache = std::make_unique<TxnStateCache>(config::merge_commit_txn_state_cache_capacity, std::move(token));
return _txn_state_cache->init();
}

Status BatchWriteMgr::register_stream_load_pipe(StreamLoadContext* pipe_ctx) {
BatchWriteId batch_write_id = {
.db = pipe_ctx->db, .table = pipe_ctx->table, .load_params = pipe_ctx->load_parameters};
Expand Down Expand Up @@ -78,7 +87,7 @@ StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const st
return it->second;
}

auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get());
auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get(), _txn_state_cache.get());
Status st = batch_write->init();
if (!st.ok()) {
LOG(ERROR) << "Fail to init batch write, " << batch_write_id << ", status: " << st;
Expand All @@ -105,6 +114,10 @@ void BatchWriteMgr::stop() {
for (auto& batch_write : stop_writes) {
batch_write->stop();
}
if (_txn_state_cache) {
_txn_state_cache->stop();
}
_executor->get_thread_pool()->shutdown();
}

StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
Expand Down Expand Up @@ -224,7 +237,45 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
ctx->buffer->flip();
ctx->receive_bytes = io_buf.size();
ctx->mc_read_data_cost_nanos = MonotonicNanos() - ctx->start_nanos;
ctx->status = exec_env->batch_write_mgr()->append_data(ctx);
ctx->status = append_data(ctx);
}

static TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
switch (status) {
case TRANS_UNKNOWN:
return TTransactionStatus::UNKNOWN;
case TRANS_PREPARE:
return TTransactionStatus::PREPARE;
case TRANS_COMMITTED:
return TTransactionStatus::COMMITTED;
case TRANS_VISIBLE:
return TTransactionStatus::VISIBLE;
case TRANS_ABORTED:
return TTransactionStatus::ABORTED;
case TRANS_PREPARED:
return TTransactionStatus::PREPARED;
default:
return TTransactionStatus::UNKNOWN;
}
}

void BatchWriteMgr::update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response) {
for (int i = 0; i < request->states_size(); i++) {
auto& txn_state = request->states(i);
auto st = _txn_state_cache->push_state(txn_state.txn_id(), to_thrift_txn_status(txn_state.status()),
txn_state.reason());
if (!st.ok()) {
LOG(WARNING) << "Failed to update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason() << ", update error: " << st;
} else {
TRACE_BATCH_WRITE << "Update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason();
}
st.to_protobuf(response->add_results());
}
}

} // namespace starrocks
17 changes: 14 additions & 3 deletions be/src/runtime/batch_write/batch_write_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/statusor.h"
#include "runtime/batch_write/isomorphic_batch_write.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/bthreads/executor.h"
Expand All @@ -32,10 +33,13 @@ class ExecEnv;
class PStreamLoadRequest;
class PStreamLoadResponse;
class StreamLoadContext;
class PUpdateTransactionStateRequest;
class PUpdateTransactionStateResponse;

class BatchWriteMgr {
public:
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)){};
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor);
Status init();

Status register_stream_load_pipe(StreamLoadContext* pipe_ctx);
void unregister_stream_load_pipe(StreamLoadContext* pipe_ctx);
Expand All @@ -45,19 +49,26 @@ class BatchWriteMgr {

void stop();

bthreads::ThreadPoolExecutor* executor() { return _executor.get(); }
TxnStateCache* txn_state_cache() { return _txn_state_cache.get(); }

static StatusOr<StreamLoadContext*> create_and_register_pipe(
ExecEnv* exec_env, BatchWriteMgr* batch_write_mgr, const string& db, const string& table,
const std::map<std::string, std::string>& load_parameters, const string& label, long txn_id,
const TUniqueId& load_id, int32_t batch_write_interval_ms);

static void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);
void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);

void update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response);

private:
StatusOr<IsomorphicBatchWriteSharedPtr> _get_batch_write(const BatchWriteId& batch_write_id,
bool create_if_missing);

std::unique_ptr<bthreads::ThreadPoolExecutor> _executor;
std::unique_ptr<TxnStateCache> _txn_state_cache;
bthreads::BThreadSharedMutex _rw_mutex;
std::unordered_map<BatchWriteId, IsomorphicBatchWriteSharedPtr, BatchWriteIdHash, BatchWriteIdEqual>
_batch_write_map;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/batch_write/batch_write_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace starrocks {

#define TRACE_BATCH_WRITE LOG_IF(INFO, config::batch_write_trace_log_enable)
#define TRACE_BATCH_WRITE LOG_IF(INFO, config::merge_commit_trace_log_enable)

using BatchWriteLoadParams = std::map<std::string, std::string>;

Expand Down
105 changes: 33 additions & 72 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ class AsyncAppendDataContext {
std::atomic_int num_retries{-1};
};

IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor)
: _batch_write_id(std::move(batch_write_id)), _executor(executor) {}
IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor,
TxnStateCache* txn_state_cache)
: _batch_write_id(std::move(batch_write_id)), _executor(executor), _txn_state_cache(txn_state_cache) {}

Status IsomorphicBatchWrite::init() {
TEST_ERROR_POINT("IsomorphicBatchWrite::init::error");
Expand Down Expand Up @@ -220,7 +221,6 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
int64_t start_ts = MonotonicNanos();
AsyncAppendDataContext* async_ctx = new AsyncAppendDataContext(data_ctx);
async_ctx->ref();
async_ctx->create_time_ts.store(MonotonicNanos());
Expand Down Expand Up @@ -258,10 +258,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_batch_write_async) {
return Status::OK();
}
int64_t timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::batch_write_default_timeout_ms;
int64_t left_timeout_ns = std::max((int64_t)0, timeout_ms * 1000 * 1000 - (MonotonicNanos() - start_ts));
return _wait_for_load_status(data_ctx, left_timeout_ns);
return _wait_for_load_finish(data_ctx);
}

int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator<Task>& iter) {
Expand Down Expand Up @@ -309,7 +306,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
SCOPED_RAW_TIMER(&write_data_cost_ns);
st = _write_data_to_pipe(async_ctx);
}
if (st.ok() || num_retries >= config::batch_write_rpc_request_retry_num) {
if (st.ok() || num_retries >= config::merge_commit_rpc_request_retry_num) {
break;
}
num_retries += 1;
Expand All @@ -324,7 +321,7 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
SCOPED_RAW_TIMER(&wait_pipe_cost_ns);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.empty()) {
_cv.wait_for(lock, config::batch_write_rpc_request_retry_interval_ms * 1000);
_cv.wait_for(lock, config::merge_commit_rpc_request_retry_interval_ms * 1000);
}
}
}
Expand Down Expand Up @@ -404,7 +401,7 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) { client->requestMergeCommit(response, request); },
config::batch_write_rpc_reqeust_timeout_ms);
config::merge_commit_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive requestBatchWrite response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", master: " << master_addr
<< ", cost: " << ((MonotonicNanos() - start_ts) / 1000) << "us, status: " << st
Expand All @@ -418,78 +415,42 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
return st.ok() ? Status(response.status) : st;
}

bool is_final_load_status(const TTransactionStatus::type& status) {
switch (status) {
case TTransactionStatus::VISIBLE:
case TTransactionStatus::ABORTED:
case TTransactionStatus::UNKNOWN:
return true;
default:
return false;
Status IsomorphicBatchWrite::_wait_for_load_finish(StreamLoadContext* data_ctx) {
int64_t total_timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::merge_commit_default_timeout_ms;
int64_t left_timeout_ms =
std::max((int64_t)0, total_timeout_ms - (MonotonicNanos() - data_ctx->start_nanos) / 1000000);
StatusOr<TxnStateSubscriberPtr> subscriber_status = _txn_state_cache->subscribe_state(
data_ctx->txn_id, data_ctx->label, data_ctx->db, data_ctx->table, data_ctx->auth);
if (!subscriber_status.ok()) {
return Status::InternalError("Failed to create txn state subscriber, " +
subscriber_status.status().to_string());
}
}

// TODO just poll the load status periodically. improve it later, such as cache the label, and FE notify the BE
Status IsomorphicBatchWrite::_wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns) {
TxnStateSubscriberPtr subscriber = std::move(subscriber_status.value());
int64_t start_ts = MonotonicNanos();
int64_t wait_load_finish_ns = std::max((int64_t)0, data_ctx->mc_left_merge_time_nanos) + 1000000;
bthread_usleep(std::min(wait_load_finish_ns, timeout_ns) / 1000);
TGetLoadTxnStatusRequest request;
request.__set_db(_batch_write_id.db);
request.__set_tbl(_batch_write_id.table);
request.__set_txnId(data_ctx->txn_id);
set_request_auth(&request, data_ctx->auth);
TGetLoadTxnStatusResult response;
Status st;
do {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
#ifndef BE_TEST
int64_t rpc_ts = MonotonicNanos();
TNetworkAddress master_addr = get_master_address();
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) {
client->getLoadTxnStatus(response, request);
},
config::batch_write_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive getLoadTxnStatus response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", txn_id: " << data_ctx->txn_id
<< ", label: " << data_ctx->batch_write_label << ", master: " << master_addr
<< ", cost: " << ((MonotonicNanos() - rpc_ts) / 1000) << "us, status: " << st
<< ", response: " << response;
#else
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::request", &request);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::status", &st);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::response", &response);
#endif
if (st.ok() && is_final_load_status(response.status)) {
break;
}
int64_t left_timeout_ns = timeout_ns - (MonotonicNanos() - start_ts);
if (left_timeout_ns <= 0) {
break;
}
bthread_usleep(
std::min(config::batch_write_poll_load_status_interval_ms * (int64_t)1000, left_timeout_ns / 1000));
} while (true);
StatusOr<TxnState> status_or = subscriber->wait_finished_state(left_timeout_ms * 1000);
data_ctx->mc_wait_finish_cost_nanos = MonotonicNanos() - start_ts;
if (!st.ok()) {
return Status::InternalError("Failed to get load status, " + st.to_string());
TRACE_BATCH_WRITE << "finish to wait load, " << _batch_write_id << ", user label: " << data_ctx->label
<< ", txn_id: " << data_ctx->txn_id << ", load label: " << data_ctx->batch_write_label
<< ", cost: " << (data_ctx->mc_wait_finish_cost_nanos / 1000)
<< "us, wait status: " << status_or.status() << ", "
<< (status_or.ok() ? status_or.value() : subscriber->current_state());
if (!status_or.ok()) {
TxnState current_state = subscriber->current_state();
return Status::InternalError(fmt::format("Failed to get load final status, current status: {}, error: {}",
to_string(current_state.txn_status), status_or.status().to_string()));
}
switch (response.status) {
case TTransactionStatus::PREPARE:
case TTransactionStatus::PREPARED:
return Status::TimedOut("load timeout, txn status: " + to_string(response.status));
switch (status_or.value().txn_status) {
case TTransactionStatus::COMMITTED:
return Status::PublishTimeout("Load has not been published before timeout");
case TTransactionStatus::VISIBLE:
return Status::OK();
case TTransactionStatus::ABORTED:
return Status::InternalError("Load is aborted, reason: " + response.reason);
return Status::InternalError("Load is aborted, reason: " + status_or.value().reason);
case TTransactionStatus::UNKNOWN:
return Status::InternalError("Can't find the transaction, reason: " + status_or.value().reason);
default:
return Status::InternalError("Load status is unknown: " + to_string(response.status));
return Status::InternalError("Load status is not final: " + to_string(status_or.value().txn_status));
}
}

Expand Down
Loading
Loading