From f9e7cae2e0b03019dddaf465ceb54f9575978df7 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 7 Jan 2025 15:16:48 +0300 Subject: [PATCH 01/13] fix incorrect data validation and correction in case WO/RO Txs --- .../tx/columnshard/columnshard__write.cpp | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index a2b254070990..c3d5c8a87d9f 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -315,18 +315,22 @@ class TCommitOperation { LockId = lock.GetLockId(); SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty(); - if (!singleShardTx) { - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is absent in sending and receiving lists"); - } - if (locks.HasArbiterColumnShard()) { - ArbiterColumnShard = locks.GetArbiterColumnShard(); - } else { - AFL_VERIFY(!ReceivingShards.empty()); + if (ReceivingShards.size()) { + AFL_VERIFY(SendingShards.size()); + if (!locks.HasArbiterColumnShard()) { ArbiterColumnShard = *ReceivingShards.begin(); + if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { + return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + } + } else { + ArbiterColumnShard = locks.GetArbiterColumnShard(); + AFL_VERIFY(ArbiterColumnShard); + if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { + return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + } } - AFL_VERIFY(ArbiterColumnShard); + } else { + AFL_VERIFY(!SendingShards.size()); } Generation = lock.GetGeneration(); From d73992a85517f5dada3bb6eef54ee93e9c9597be Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 7 Jan 2025 15:17:17 +0300 Subject: [PATCH 02/13] correct --- ydb/core/tx/columnshard/columnshard__write.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index c3d5c8a87d9f..562b3ac0a44f 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -315,8 +315,7 @@ class TCommitOperation { LockId = lock.GetLockId(); SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - if (ReceivingShards.size()) { - AFL_VERIFY(SendingShards.size()); + if (ReceivingShards.size() && SendingShards.size()) { if (!locks.HasArbiterColumnShard()) { ArbiterColumnShard = *ReceivingShards.begin(); if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { @@ -330,7 +329,7 @@ class TCommitOperation { } } } else { - AFL_VERIFY(!SendingShards.size()); + AFL_VERIFY(!SendingShards.size() && !ReceivingShards.size()); } Generation = lock.GetGeneration(); From 88f12ce76570f4b8472f81def0f75317085a006a Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 7 Jan 2025 15:19:16 +0300 Subject: [PATCH 03/13] correct --- ydb/core/tx/columnshard/columnshard__write.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 562b3ac0a44f..badc5e3f020a 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -315,6 +315,9 @@ class TCommitOperation { LockId = lock.GetLockId(); SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); + if (SendingShards.empty() != ReceivingShards.empty()) { + return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)"); + } if (ReceivingShards.size() && SendingShards.size()) { if (!locks.HasArbiterColumnShard()) { ArbiterColumnShard = *ReceivingShards.begin(); @@ -328,8 +331,6 @@ class TCommitOperation { return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); } } - } else { - AFL_VERIFY(!SendingShards.size() && !ReceivingShards.size()); } Generation = lock.GetGeneration(); From 0b9f59abba809813d2fbdc7d034dea1c1b8d6c23 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 9 Jan 2025 13:26:20 +0300 Subject: [PATCH 04/13] corrections --- ydb/core/tx/columnshard/columnshard__write.cpp | 9 +++------ .../transactions/operators/ev_write/primary.h | 3 ++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index badc5e3f020a..85f47a0f71c6 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -319,17 +319,14 @@ class TCommitOperation { return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)"); } if (ReceivingShards.size() && SendingShards.size()) { + if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { + return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + } if (!locks.HasArbiterColumnShard()) { ArbiterColumnShard = *ReceivingShards.begin(); - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); - } } else { ArbiterColumnShard = locks.GetArbiterColumnShard(); AFL_VERIFY(ArbiterColumnShard); - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); - } } } diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 8fc19f3c8587..6ec0e70811b1 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -159,7 +159,8 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac }; virtual bool IsTxBroken() const override { - return TxBroken.value_or(false); + AFL_VERIFY(TxBroken); + return *TxBroken; } void InitializeRequests(TColumnShard& owner) { From a28a4245724a010fd26f88edd63d9132b65a3fbe Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 9 Jan 2025 13:28:01 +0300 Subject: [PATCH 05/13] correction --- ydb/core/tx/columnshard/columnshard__write.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 85f47a0f71c6..0545e9368a35 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -326,8 +326,8 @@ class TCommitOperation { ArbiterColumnShard = *ReceivingShards.begin(); } else { ArbiterColumnShard = locks.GetArbiterColumnShard(); - AFL_VERIFY(ArbiterColumnShard); } + AFL_VERIFY(ArbiterColumnShard); } Generation = lock.GetGeneration(); From 831907dc59f35162fd62f9bac65d03d4ad38e94c Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 13:21:43 +0300 Subject: [PATCH 06/13] correction --- ydb/core/tx/columnshard/columnshard__write.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0545e9368a35..0fe3d752825e 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -320,14 +320,13 @@ class TCommitOperation { } if (ReceivingShards.size() && SendingShards.size()) { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists"); } - if (!locks.HasArbiterColumnShard()) { - ArbiterColumnShard = *ReceivingShards.begin(); - } else { - ArbiterColumnShard = locks.GetArbiterColumnShard(); + AFL_VERIFY(locks.HasArbiterColumnShard()); + ArbiterColumnShard = locks.GetArbiterColumnShard(); + if (!ReceivingShards.contains(*ArbiterColumnShard)) { + return TConclusionStatus::Fail("arbiter is absent in receiving lists"); } - AFL_VERIFY(ArbiterColumnShard); } Generation = lock.GetGeneration(); From bdc45466574cb12e3427c279623b4278f0ee3370 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 13:22:19 +0300 Subject: [PATCH 07/13] fix --- ydb/core/tx/columnshard/columnshard__write.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0fe3d752825e..35e26ee6b1c8 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -322,7 +322,9 @@ class TCommitOperation { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists"); } - AFL_VERIFY(locks.HasArbiterColumnShard()); + if (!locks.HasArbiterColumnShard()) { + return TConclusionStatus::Fail("no arbiter info in request"); + } ArbiterColumnShard = locks.GetArbiterColumnShard(); if (!ReceivingShards.contains(*ArbiterColumnShard)) { return TConclusionStatus::Fail("arbiter is absent in receiving lists"); From 64614927dc45fbf03b6038861afc10848dfd4aa7 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 14:27:33 +0300 Subject: [PATCH 08/13] fix --- ydb/core/tx/columnshard/columnshard__write.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 35e26ee6b1c8..676179a849ec 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -326,7 +326,7 @@ class TCommitOperation { return TConclusionStatus::Fail("no arbiter info in request"); } ArbiterColumnShard = locks.GetArbiterColumnShard(); - if (!ReceivingShards.contains(*ArbiterColumnShard)) { + if (!ReceivingShards.contains(ArbiterColumnShard)) { return TConclusionStatus::Fail("arbiter is absent in receiving lists"); } } From 65bf25ea0f88a31ed18cf17319e1738cf2200eae Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 16:05:45 +0300 Subject: [PATCH 09/13] additional info --- ydb/core/tx/columnshard/columnshard__write.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 676179a849ec..36ac2a06cb3b 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -327,7 +327,7 @@ class TCommitOperation { } ArbiterColumnShard = locks.GetArbiterColumnShard(); if (!ReceivingShards.contains(ArbiterColumnShard)) { - return TConclusionStatus::Fail("arbiter is absent in receiving lists"); + return TConclusionStatus::Fail("arbiter is absent in receiving lists: " + ::ToString(ArbiterColumnShard) + " in [" + JoinSeq(", ", ReceivingShards) + "]"); } } From 66c417192205be6e07711a55fef9ac7b751b2a1d Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 16:11:32 +0300 Subject: [PATCH 10/13] correct logging --- ydb/core/tx/columnshard/columnshard__write.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 36ac2a06cb3b..c61e7ffafb79 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -327,7 +327,9 @@ class TCommitOperation { } ArbiterColumnShard = locks.GetArbiterColumnShard(); if (!ReceivingShards.contains(ArbiterColumnShard)) { - return TConclusionStatus::Fail("arbiter is absent in receiving lists: " + ::ToString(ArbiterColumnShard) + " in [" + JoinSeq(", ", ReceivingShards) + "]"); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( + "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); + return TConclusionStatus::Fail("arbiter is absent in receiving lists"); } } From 02bffb6acbb324f3d20775b2b4ec24e25d36c381 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 10 Jan 2025 17:32:23 +0300 Subject: [PATCH 11/13] fix --- ydb/core/kqp/common/kqp_tx_manager.cpp | 12 ++++++++---- ydb/core/tx/columnshard/columnshard__write.cpp | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 6c0803d33a7b..f2ddd9d53c03 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -278,17 +278,17 @@ class TKqpTransactionManager : public IKqpTransactionManager { for (auto& [shardId, shardInfo] : ShardsInfo) { if ((shardInfo.Flags & EAction::WRITE)) { ReceivingShards.insert(shardId); + if (shardInfo.IsOlap) { + receivingColumnShardsSet.insert(shardId); + } if (IsVolatile()) { SendingShards.insert(shardId); } - if (shardInfo.IsOlap) { - sendingColumnShardsSet.insert(shardId); - } } if (!shardInfo.Locks.empty()) { SendingShards.insert(shardId); if (shardInfo.IsOlap) { - receivingColumnShardsSet.insert(shardId); + sendingColumnShardsSet.insert(shardId); } } @@ -315,6 +315,8 @@ class TKqpTransactionManager : public IKqpTransactionManager { } } + Cerr << "TEST>> PREPARE :: " << receivingColumnShardsSet.size() << " " << sendingColumnShardsSet.size() << Endl; + if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) { AFL_ENSURE(!IsVolatile()); const auto& shards = receivingColumnShardsSet.empty() @@ -327,6 +329,8 @@ class TKqpTransactionManager : public IKqpTransactionManager { ArbiterColumnShard = *arbiterIterator; } + Cerr << "TEST>> arbiter=" << ArbiterColumnShard << Endl; + ShardsToWaitPrepare = ShardsIds; MinStep = std::numeric_limits::min(); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index c61e7ffafb79..e1f2fa1f3728 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -326,10 +326,10 @@ class TCommitOperation { return TConclusionStatus::Fail("no arbiter info in request"); } ArbiterColumnShard = locks.GetArbiterColumnShard(); - if (!ReceivingShards.contains(ArbiterColumnShard)) { + if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); - return TConclusionStatus::Fail("arbiter is absent in receiving lists"); + return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists"); } } From ac6b9866e4eb3c99481c0192b6ecc4d5dd00a2a1 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 10 Jan 2025 17:35:32 +0300 Subject: [PATCH 12/13] fix --- ydb/core/kqp/common/kqp_tx_manager.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index f2ddd9d53c03..524626a52ffe 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -315,8 +315,6 @@ class TKqpTransactionManager : public IKqpTransactionManager { } } - Cerr << "TEST>> PREPARE :: " << receivingColumnShardsSet.size() << " " << sendingColumnShardsSet.size() << Endl; - if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) { AFL_ENSURE(!IsVolatile()); const auto& shards = receivingColumnShardsSet.empty() @@ -329,8 +327,6 @@ class TKqpTransactionManager : public IKqpTransactionManager { ArbiterColumnShard = *arbiterIterator; } - Cerr << "TEST>> arbiter=" << ArbiterColumnShard << Endl; - ShardsToWaitPrepare = ShardsIds; MinStep = std::numeric_limits::min(); From 4b20b6e29f18af5259e5ad1e3d6f0049e187f32c Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 10 Jan 2025 17:52:07 +0300 Subject: [PATCH 13/13] fix --- ydb/core/kqp/common/kqp_tx_manager.cpp | 1 + ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 1 + ydb/core/tx/columnshard/columnshard__write.cpp | 6 ++++++ 3 files changed, 8 insertions(+) diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 524626a52ffe..c6f94262b634 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -325,6 +325,7 @@ class TKqpTransactionManager : public IKqpTransactionManager { auto arbiterIterator = std::begin(shards); std::advance(arbiterIterator, index); ArbiterColumnShard = *arbiterIterator; + ReceivingShards.insert(*ArbiterColumnShard); } ShardsToWaitPrepare = ShardsIds; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bcb66aa49750..cbd154485695 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2561,6 +2561,7 @@ class TKqpDataExecuter : public TKqpExecuterBase