Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover lost locks as broken #13181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/olap KqpOlap.DeleteAbsent+Reboot
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlap.ManyColumnShardsWithRestarts
Expand Down
69 changes: 61 additions & 8 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2992,9 +2992,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}

Y_UNIT_TEST_TWIN(DeleteAbsent, Reboot) {
//This test tries to DELETE from a table with WHERE condition that matches no rows
//It corresponds to a SCAN, then NO write then COMMIT
void TestDeleteAbsent(const size_t shardCount, bool reboot) {
//This test tries to DELETE from a table when there is no rows to delete at some shard
//It corresponds to a SCAN, then NO write then COMMIT on that shard
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

NKikimrConfig::TAppConfig appConfig;
Expand All @@ -3006,20 +3006,73 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TTestHelper::TColumnSchema().SetName("value").SetType(NScheme::NTypeIds::Int32).SetNullable(true),
};
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ttt").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema);
testTable.SetName("/Root/ttt").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema).SetMinPartitionsCount(shardCount);
testHelper.CreateTable(testTable);
auto client = testHelper.GetKikimr().GetQueryClient();
//1. Insert exactlly one row into a table, so the only shard will contain a row
const auto result = client
.ExecuteQuery(
R"(
INSERT INTO `/Root/ttt` (id, value) VALUES
(1, 11)
)",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
//2. Ensure that there is actually 1 row in the table
{
const auto resultSelect = client
.ExecuteQuery(
"SELECT * FROM `/Root/ttt`",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();

if (Reboot) {
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
const auto resultSets = resultSelect.GetResultSets();
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
const auto resultSet = resultSets[0];
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
}
if (reboot) {
csController->SetRestartOnLocalTxCommitted("TProposeWriteTransaction");
}
auto client = testHelper.GetKikimr().GetQueryClient();
//DELETE 1 row from one shard and 0 rows from others
const auto resultDelete =
client
.ExecuteQuery(
"DELETE from `/Root/ttt` WHERE value % 2 == 1;",
"DELETE from `/Root/ttt` ",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(resultDelete.IsSuccess(), resultDelete.GetIssues().ToString());
UNIT_ASSERT_C(resultDelete.IsSuccess() != reboot, resultDelete.GetIssues().ToString());
{
const auto resultSelect = client
.ExecuteQuery(
"SELECT * FROM `/Root/ttt`",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();

UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
const auto resultSets = resultSelect.GetResultSets();
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
const auto resultSet = resultSets[0];
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), reboot ? 1 : 0);

}
//DELETE 0 rows from every shard
const auto resultDelete2 =
client
.ExecuteQuery(
"DELETE from `/Root/ttt` WHERE id < 100",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(resultDelete2.IsSuccess() != reboot, result.GetIssues().ToString());
}
Y_UNIT_TEST_TWIN(DeleteAbsentSingleShard, Reboot) {
TestDeleteAbsent(1, Reboot);
}

Y_UNIT_TEST_TWIN(DeleteAbsentMultipleShards, Reboot) {
TestDeleteAbsent(2, Reboot);
}
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
while (!rowset.EndOfSet()) {
const ui64 lockId = rowset.GetValue<Schema::OperationTxIds::LockId>();
const ui64 txId = rowset.GetValue<Schema::OperationTxIds::TxId>();
AFL_VERIFY(LockFeatures.contains(lockId))("lock_id", lockId);
if (auto it = LockFeatures.find(lockId); it == LockFeatures.end()) {
auto lock = TLockFeatures(lockId, 0);
lock.SetBroken();
LockFeatures.emplace(lockId, std::move(lock));
}
AFL_VERIFY(Tx2Lock.emplace(txId, lockId).second);
if (!rowset.Next()) {
return false;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace NKikimr::NColumnShard {
class TColumnShard;
class TLockFeatures;

class TLockSharingInfo {
class TLockSharingInfo: TMoveOnly {
private:
const ui64 LockId;
const ui64 Generation;
TAtomicCounter InternalGenerationCounter = 0;
TAtomicCounter Broken = 0;
std::atomic<bool> Broken = false;
TAtomicCounter WritesCounter = 0;
friend class TLockFeatures;

Expand All @@ -43,7 +43,7 @@ class TLockSharingInfo {
}

bool IsBroken() const {
return Broken.Val();
return Broken;
}

ui64 GetCounter() const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/transactions/locks/interaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ class TPointInfo {
return StartTxIds.empty() && FinishTxIds.empty() && IntervalTxIds.empty();
}

void ProvideTxIdsFrom(const TPointInfo& previouse) {
for (auto&& i : previouse.IntervalTxIds) {
void ProvideTxIdsFrom(const TPointInfo& previous) {
for (auto&& i : previous.IntervalTxIds) {
auto provided = i.second;
{
auto it = StartTxIds.find(i.first);
Expand Down
Loading