Skip to content

Commit

Permalink
remove unused parameters of update_transaction_state
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
  • Loading branch information
banmoy committed Jan 14, 2025
1 parent b6d1a4a commit 3a9b0ee
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 21 deletions.
3 changes: 1 addition & 2 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
}
}

void BatchWriteMgr::update_transaction_state(ExecEnv* exec_env, brpc::Controller* cntl,
const PUpdateTransactionStateRequest* request,
void BatchWriteMgr::update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response) {
for (int i = 0; i < request->states_size(); i++) {
auto& txn_state = request->states(i);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/batch_write/batch_write_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ class BatchWriteMgr {
void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);

void update_transaction_state(ExecEnv* exec_env, brpc::Controller* cntl,
const PUpdateTransactionStateRequest* request,
void update_transaction_state(const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response);

private:
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ void PInternalServiceImplBase<T>::update_transaction_state(google::protobuf::Rpc
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
auto* cntl = static_cast<brpc::Controller*>(cntl_base);
_exec_env->batch_write_mgr()->update_transaction_state(_exec_env, cntl, request, response);
_exec_env->batch_write_mgr()->update_transaction_state(request, response);
}

template class PInternalServiceImplBase<PInternalService>;
Expand Down
3 changes: 1 addition & 2 deletions be/test/runtime/batch_write/batch_write_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ TEST_F(BatchWriteMgrTest, stream_load_rpc_fail) {
}

TEST_F(BatchWriteMgrTest, update_transaction_state) {
brpc::Controller cntl;
PUpdateTransactionStateRequest request;
std::vector<TxnState> expected_cache_state;

Expand Down Expand Up @@ -417,7 +416,7 @@ TEST_F(BatchWriteMgrTest, update_transaction_state) {
expected_cache_state.push_back({TTransactionStatus::UNKNOWN, ""});

PUpdateTransactionStateResponse response;
_batch_write_mgr->update_transaction_state(_exec_env, &cntl, &request, &response);
_batch_write_mgr->update_transaction_state(&request, &response);
ASSERT_EQ(request.states_size(), response.results_size());
for (int i = 1; i <= expected_cache_state.size(); ++i) {
ASSERT_EQ(TStatusCode::OK, response.results(i - 1).status_code());
Expand Down
35 changes: 21 additions & 14 deletions be/test/runtime/batch_write/txn_state_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,15 @@ TEST_F(TxnStateCacheTest, cache_push_state_notify_subscriber) {
ASSERT_OK(s2_1.status());
assert_txn_state_eq({TTransactionStatus::ABORTED, "artificial failure"}, s2_1.value()->current_state());

auto t1_1 = std::thread(
[&]() { wait_func(s1_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t1_1 = std::thread([&]() {
wait_func(s1_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
ASSERT_OK(cache->push_state(1, TTransactionStatus::VISIBLE, ""));
t1_1.join();

auto t1_2 = std::thread(
[&]() { wait_func(s1_2.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t1_2 = std::thread([&]() {
wait_func(s1_2.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
t1_2.join();

auto t2_1 = std::thread([&]() {
Expand Down Expand Up @@ -493,14 +495,18 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
ASSERT_TRUE(poller->is_txn_pending(3));
ASSERT_TRUE(poller->is_txn_pending(4));

auto t1_1 = std::thread(
[&]() { wait_func(s1_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t1_2 = std::thread(
[&]() { wait_func(s1_2.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t2_1 = std::thread(
[&]() { wait_func(s2_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t3_1 = std::thread(
[&]() { wait_func(s3_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t1_1 = std::thread([&]() {
wait_func(s1_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
auto t1_2 = std::thread([&]() {
wait_func(s1_2.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
auto t2_1 = std::thread([&]() {
wait_func(s2_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
auto t3_1 = std::thread([&]() {
wait_func(s3_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});

// advance time and should trigger txn 1, 2 and 3 to poll
SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::get_current_ms", [&](void* arg) { *((int64_t*)arg) = 160; });
Expand All @@ -519,8 +525,9 @@ TEST_F(TxnStateCacheTest, cache_poll_state_notify_subscriber) {
ASSERT_EQ(3, num_rpc.load());

assert_txn_state_eq({TTransactionStatus::PREPARE, ""}, s4_1.value()->current_state());
auto t4_1 = std::thread(
[&]() { wait_func(s4_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""})); });
auto t4_1 = std::thread([&]() {
wait_func(s4_1.value().get(), 60000000, StatusOr<TxnState>({TTransactionStatus::VISIBLE, ""}));
});
SyncPoint::GetInstance()->SetCallBack("TxnStatePoller::_execute_poll::response", [&](void* arg) {
TGetLoadTxnStatusResult* result = (TGetLoadTxnStatusResult*)arg;
result->__set_status(TTransactionStatus::COMMITTED);
Expand Down

0 comments on commit 3a9b0ee

Please sign in to comment.