diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 33aa5f2966f8..4369db8773ed 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -133,7 +133,6 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingShardsSet; absl::flat_hash_set receivingShardsSet; + absl::flat_hash_set sendingColumnShardsSet; absl::flat_hash_set receivingColumnShardsSet; ui64 arbiter = 0; std::optional columnShardArbiter; @@ -2378,6 +2378,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseHasLocks()) { // Locks may be broken so shards with locks need to send readsets sendingShardsSet.insert(shardId); + + if (ShardIdToTableInfo->Get(shardId).IsOlap) { + sendingColumnShardsSet.insert(shardId); + } } if (ShardsWithEffects.contains(shardId)) { // Volatile transactions may abort effects, so they send readsets @@ -2387,7 +2391,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet(shardId).IsOlap) { + if (ShardIdToTableInfo->Get(shardId).IsOlap) { receivingColumnShardsSet.insert(shardId); } } @@ -2445,10 +2449,13 @@ class TKqpDataExecuter : public TKqpExecuterBase(receivingColumnShardsSet.size()); - auto arbiterIterator = std::begin(receivingColumnShardsSet); + if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) { + const auto& shards = receivingColumnShardsSet.empty() + ? sendingColumnShardsSet + : receivingColumnShardsSet; + + const ui32 index = RandomNumber(shards.size()); + auto arbiterIterator = std::begin(shards); std::advance(arbiterIterator, index); columnShardArbiter = *arbiterIterator; } @@ -2857,7 +2864,6 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; TShardIdToTableInfoPtr ShardIdToTableInfo; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 6ab1792f11a8..cbae2cba2209 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3190,12 +3190,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { auto result = client.ExecuteQuery(R"( - SELECT * FROM `/Root/DataShard`; - SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/DataShard` ORDER BY Col1; + SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); - CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])", + FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1))); } { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b7fd08454408..2d41e3d05165 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -89,6 +89,8 @@ class TGeneralCompactColumnEngineChanges; namespace NKikimr::NColumnShard { +class TEvWriteCommitPrimaryTransactionOperator; +class TEvWriteCommitSecondaryTransactionOperator; class TTxFinishAsyncTransaction; class TTxInsertTableCleanup; class TTxRemoveSharedBlobs; @@ -138,6 +140,8 @@ class TColumnShard : public TActor , public NTabletFlatExecutor::TTabletExecutedFlat { + friend class TEvWriteCommitSecondaryTransactionOperator; + friend class TEvWriteCommitPrimaryTransactionOperator; friend class TTxInsertTableCleanup; friend class TTxInit; friend class TTxInitSchema; diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 2a9d42b00283..f14e3660150c 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -57,7 +57,8 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr& /*ta if (OriginalBatch->num_columns() != indexSchema->num_fields()) { AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())( "index", indexSchema->num_fields()); - if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) { + if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard() && + WriteData.GetWriteMeta().GetModificationType() != NEvWrite::EModificationType::Delete) { subset = NArrow::TSchemaSubset::AllFieldsAccepted(); const std::vector& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false); const std::set columnIdsSet(columnIdsVector.begin(), columnIdsVector.end()); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 2ddb57cc4b1e..40a2a6586ab4 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -141,7 +141,10 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( "receive", TabletId); - AFL_VERIFY(op->WaitShardsResultAck.erase(TabletId)); + if (!op->WaitShardsResultAck.erase(TabletId)) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))( + "receive", TabletId); + } op->CheckFinished(*Self); } @@ -188,7 +191,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac if (tabletId && *tabletId != i) { continue; } - NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), + owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), new TEvPipeCache::TEvForward( new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true), IEventHandle::FlagTrackDelivery, GetTxId()); @@ -202,7 +205,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT); for (auto&& i : ReceivingShards) { if (WaitShardsResultAck.contains(i)) { - NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), + owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i, true), diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h index ae0224057b46..ae249b07995f 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h @@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans } void SendBrokenFlagAck(TColumnShard& owner) { - NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), + owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), new TEvPipeCache::TEvForward( new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true), IEventHandle::FlagTrackDelivery, GetTxId()); @@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans void SendResult(TColumnShard& owner) { NKikimrTx::TReadSetData readSetData; readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT); - NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), + owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet( 0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()), ArbiterTabletId, true),