diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 50ac4a55fcac..6e446d164a1d 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -207,7 +207,6 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig } if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { - YQL_ENSURE(txCtx.EnableImmediateEffects); return true; } diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index ae355f3dc36d..74a543b4022a 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -168,8 +168,8 @@ using TShardIdToTableInfoPtr = std::shared_ptr; class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { public: explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry, - TIntrusivePtr timeProvider, TIntrusivePtr randomProvider, bool enableImmediateEffects) - : NYql::TKikimrTransactionContextBase(enableImmediateEffects) + TIntrusivePtr timeProvider, TIntrusivePtr randomProvider) + : NYql::TKikimrTransactionContextBase() , Implicit(implicit) , ParamsState(MakeIntrusive()) { @@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool ShouldExecuteDeferredEffects() const { - if (HasUncommittedChangesRead) { + if (HasUncommittedChangesRead || HasOlapTable) { YQL_ENSURE(EnableImmediateEffects); return !DeferredEffects.Empty(); } @@ -298,7 +298,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool CanDeferEffects() const { - if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) { YQL_ENSURE(EnableImmediateEffects); return false; } @@ -379,7 +379,7 @@ struct THash { }; namespace NKikimr::NKqp { - + class TTransactionsCache { size_t MaxActiveSize; THashMap, THash> Active; diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 53d81f5d8ce8..057a42e9671c 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -587,7 +587,6 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup(); kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin(); kqpConfig.EnableKqpDataQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin(); - kqpConfig.EnableKqpImmediateEffects = serviceConfig.GetEnableKqpImmediateEffects(); kqpConfig.EnablePreparedDdl = serviceConfig.GetEnablePreparedDdl(); kqpConfig.EnableSequences = serviceConfig.GetEnableSequences(); kqpConfig.EnableColumnsWithDefault = serviceConfig.GetEnableColumnsWithDefault(); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 9371e41ddf26..ecceb4c40021 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -513,13 +513,11 @@ class TKqpCompileService : public TActorBootstrapped { bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin(); bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin(); - bool enableKqpDataQuerySourceRead = TableServiceConfig.GetEnableKqpDataQuerySourceRead(); bool enableKqpScanQuerySourceRead = TableServiceConfig.GetEnableKqpScanQuerySourceRead(); bool predicateExtract20 = TableServiceConfig.GetPredicateExtract20(); bool defaultSyntaxVersion = TableServiceConfig.GetSqlVersion(); - bool enableKqpImmediateEffects = TableServiceConfig.GetEnableKqpImmediateEffects(); auto indexAutoChooser = TableServiceConfig.GetIndexAutoChooseMode(); @@ -556,10 +554,8 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnableKqpScanQueryStreamLookup() != enableKqpScanQueryStreamLookup || TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin || TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin() != enableKqpDataQueryStreamIdxLookupJoin || - TableServiceConfig.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead || TableServiceConfig.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead || TableServiceConfig.GetPredicateExtract20() != predicateExtract20 || - TableServiceConfig.GetEnableKqpImmediateEffects() != enableKqpImmediateEffects || TableServiceConfig.GetIndexAutoChooseMode() != indexAutoChooser || TableServiceConfig.GetEnableSequences() != enableSequences || TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault || diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 40b34dde0c3e..9f0c8ec59f94 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -3,32 +3,31 @@ #include "kqp_locks_helper.h" #include "kqp_partition_helper.h" #include "kqp_planner.h" +#include "kqp_shards_resolver.h" #include "kqp_table_resolver.h" #include "kqp_tasks_validate.h" -#include "kqp_shards_resolver.h" #include #include -#include #include +#include +#include #include #include -#include -#include -#include #include +#include +#include #include #include #include #include #include -#include +#include #include #include #include - namespace NKikimr { namespace NKqp { @@ -43,7 +42,7 @@ static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100); static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4); static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB -class TKqpDataExecuter : public TKqpExecuterBase { +class TKqpDataExecuter: public TKqpExecuterBase { using TBase = TKqpExecuterBase; using TKqpSnapshot = IKqpGateway::TKqpSnapshot; @@ -93,8 +92,8 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, + ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo) + : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , UseEvWrite(useEvWrite) + , UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink()) + , HtapTx(tableServiceConfig.GetEnableHtapTx()) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) , ShardIdToTableInfo(shardIdToTableInfo) - , HtapTx(htapTx) { Target = creator; @@ -156,8 +153,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeActors().size() : 0) << " compute actor(s) and " - << notFinished << " datashard(s): "; + auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName() << ", waiting for " + << (Planner ? Planner->GetPendingComputeActors().size() : 0) << " compute actor(s) and " << notFinished + << " datashard(s): "; if (Planner) { for (const auto& shardId : Planner->GetPendingComputeActors()) { sb << "CA " << shardId.first << ", "; @@ -183,13 +180,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseEnableMvccSnapshotWithLegacyDomainRoot) - ); + const bool forceSnapshot = (ReadOnlyTx && !ImmediateTx && !HasPersistentChannels && !HasOlapTable && + (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot)); return forceSnapshot; } @@ -203,8 +195,7 @@ class TKqpDataExecuter : public TKqpExecuterBase (int)ReplySizeLimit) { TString message; if (ResponseEv->TxResults.size() == 1 && !ResponseEv->TxResults[0].QueryResultIndex.Defined()) { - message = TStringBuilder() << "Intermediate data materialization exceeded size limit" - << " (" << resultSize << " > " << ReplySizeLimit << ")." - << " This usually happens when trying to write large amounts of data or to perform lookup" - << " by big collection of keys in single query. Consider using smaller batches of data."; + message = TStringBuilder() << "Intermediate data materialization exceeded size limit" << " (" << resultSize << " > " + << ReplySizeLimit << ")." + << " This usually happens when trying to write large amounts of data or to perform lookup" + << " by big collection of keys in single query. Consider using smaller batches of data."; } else { - message = TStringBuilder() << "Query result size limit exceeded. (" - << resultSize << " > " << ReplySizeLimit << ")"; + message = TStringBuilder() << "Query result size limit exceeded. (" << resultSize << " > " << ReplySizeLimit << ")"; } auto issue = YqlIssue({}, TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, message); @@ -309,7 +299,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record; const ui64 tabletId = event.GetOrigin(); - LOG_D("Got propose result" << - ", PQ tablet: " << tabletId << - ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + LOG_D("Got propose result" << ", PQ tablet: " << tabletId + << ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); TShardState* state = ShardStates.FindPtr(tabletId); YQL_ENSURE(state, "Unexpected propose result from unknown PQ tablet " << tabletId); switch (event.GetStatus()) { - case NKikimrPQ::TEvProposeTransactionResult::PREPARED: - if (!ShardPrepared(*state, event)) { - return CancelProposal(tabletId); - } - return CheckPrepareCompleted(); - case NKikimrPQ::TEvProposeTransactionResult::COMPLETE: - YQL_ENSURE(false); - default: - CancelProposal(tabletId); - return PQTabletError(event); + case NKikimrPQ::TEvProposeTransactionResult::PREPARED: + if (!ShardPrepared(*state, event)) { + return CancelProposal(tabletId); + } + return CheckPrepareCompleted(); + case NKikimrPQ::TEvProposeTransactionResult::COMPLETE: + YQL_ENSURE(false); + default: + CancelProposal(tabletId); + return PQTabletError(event); } } @@ -403,9 +392,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetStatus()) - << ", error: " << res->GetError()); + LOG_D("Got propose result, shard: " << shardId + << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) + << ", error: " << res->GetError()); if (Stats) { Stats->AddDatashardPrepareStats(std::move(*res->Record.MutableTxStats())); @@ -434,28 +423,24 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetStatus()) - << ", error: " << res->Record.GetStatusMessage()); + LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxColumnShard::EResultStatus_Name(res->Record.GetStatus()) + << ", error: " << res->Record.GetStatusMessage()); -// if (Stats) { -// Stats->AddDatashardPrepareStats(std::move(*res->Record.MutableTxStats())); -// } + // if (Stats) { + // Stats->AddDatashardPrepareStats(std::move(*res->Record.MutableTxStats())); + // } switch (res->Record.GetStatus()) { - case NKikimrTxColumnShard::EResultStatus::PREPARED: - { + case NKikimrTxColumnShard::EResultStatus::PREPARED: { if (!ShardPrepared(*shardState, res->Record)) { return CancelProposal(shardId); } return CheckPrepareCompleted(); } - case NKikimrTxColumnShard::EResultStatus::SUCCESS: - { + case NKikimrTxColumnShard::EResultStatus::SUCCESS: { YQL_ENSURE(false); } - default: - { + default: { CancelProposal(shardId); return ShardError(res->Record); } @@ -472,19 +457,18 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); - LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId - << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) - << ", TxId=" << ev->Get()->Record.GetTxId() - << ", Locks= " << [&]() { - TStringBuilder builder; - for (const auto& lock : ev->Get()->Record.GetTxLocks()) { - builder << lock.ShortDebugString(); - } - return builder; - }() - << ", Cookie=" << ev->Cookie - << ", error=" << issues.ToString()); - + LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" + << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) + << ", TxId=" << ev->Get()->Record.GetTxId() << ", Locks= " << + [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Cookie=" << ev->Cookie + << ", error=" << issues.ToString()); + if (Stats) { Stats->AddDatashardPrepareStats(std::move(*res->Record.MutableTxStats())); } @@ -510,15 +494,13 @@ class TKqpDataExecuter : public TKqpExecuterBaseBrokenLockShardId = shardId; if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); + ResponseEv->BrokenLockPathId = + NYql::TKikimrPathId(res->Record.GetTxLocks(0).GetSchemeShard(), res->Record.GetTxLocks(0).GetPathId()); } ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); return; } - default: - { + default: { return ShardError(res->Record); } } @@ -580,8 +562,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseTabletId << " propose error, notDelivered: " << msg->NotDelivered - << ", notPrepared: " << notPrepared << ", wasRestart: " << wasRestarting); + LOG_I("Shard " << msg->TabletId << " propose error, notDelivered: " << msg->NotDelivered << ", notPrepared: " << notPrepared + << ", wasRestart: " << wasRestarting); if (notPrepared) { CancelProposal(msg->TabletId); @@ -605,10 +587,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseReattachState.Reattaching) && - shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) - { - LOG_N("Shard " << msg->TabletId << " delivery problem (already prepared, reattaching in " - << shardState->ReattachState.Delay << ")"); + shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) { + LOG_N("Shard " << msg->TabletId << " delivery problem (already prepared, reattaching in " << shardState->ReattachState.Delay + << ")"); Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); ++shardState->RestartCount; @@ -616,7 +597,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseTabletId << " delivery problem (already prepared)" - << (msg->NotDelivered ? ", last message not delivered" : "")); + << (msg->NotDelivered ? ", last message not delivered" : "")); CancelProposal(0); return ReplyTxStateUnknown(msg->TabletId); @@ -640,11 +621,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseFollower); - Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward( - new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false)); + Send(MakePipePerNodeCacheID(/* allowFollowers */ false), + new TEvPipeCache::TEvForward(new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false)); } } } - template + template bool ShardPreparedImpl(TShardState& state, const E& result) { YQL_ENSURE(state.State == TShardState::EState::Preparing); state.State = TShardState::EState::Prepared; @@ -669,8 +647,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(result.GetDomainCoordinators().begin(), - result.GetDomainCoordinators().end())); + auto domainCoordinators = TCoordinators(TVector(result.GetDomainCoordinators().begin(), result.GetDomainCoordinators().end())); coordinator = domainCoordinators.Select(TxId); } @@ -679,13 +656,13 @@ class TKqpDataExecuter : public TKqpExecuterBaseTxProxyMon->TxResultAborted->Inc(); - ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, MakeIssue( - NKikimrIssues::TIssuesIds::TX_DECLINED_IMPLICIT_COORDINATOR, "Unable to choose coordinator.")); + ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, + MakeIssue(NKikimrIssues::TIssuesIds::TX_DECLINED_IMPLICIT_COORDINATOR, "Unable to choose coordinator.")); return false; } @@ -721,7 +698,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseCounters->DataShardTxReplySizeExceededError->Inc(); } @@ -791,8 +768,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseTxProxyMon->TxResultShardOverloaded->Inc(); auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OVERLOADED); AddColumnShardErrors(result, issue); return ReplyErrorAndDie(Ydb::StatusIds::OVERLOADED, issue); } - case NKikimrTxColumnShard::EResultStatus::ABORTED: - { + case NKikimrTxColumnShard::EResultStatus::ABORTED: { Counters->TxProxyMon->TxResultAborted->Inc(); auto issue = YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_ABORTED); AddColumnShardErrors(result, issue); return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issue); } - case NKikimrTxColumnShard::EResultStatus::TIMEOUT: - { + case NKikimrTxColumnShard::EResultStatus::TIMEOUT: { Counters->TxProxyMon->TxResultShardTryLater->Inc(); auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); AddColumnShardErrors(result, issue); return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); } - case NKikimrTxColumnShard::EResultStatus::ERROR: - { + case NKikimrTxColumnShard::EResultStatus::ERROR: { Counters->TxProxyMon->TxResultError->Inc(); auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); AddColumnShardErrors(result, issue); return ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); } - default: - { + default: { Counters->TxProxyMon->TxResultFatal->Inc(); - auto issue = YqlIssue({}, TIssuesIds::DEFAULT_ERROR, "Error executing transaction: transaction failed." + NKikimrTxColumnShard::EResultStatus_Name(result.GetStatus())); + auto issue = YqlIssue({}, TIssuesIds::DEFAULT_ERROR, + "Error executing transaction: transaction failed." + NKikimrTxColumnShard::EResultStatus_Name(result.GetStatus())); AddColumnShardErrors(result, issue); return ReplyErrorAndDie(Ydb::StatusIds::GENERIC_ERROR, issue); } @@ -903,31 +876,31 @@ class TKqpDataExecuter : public TKqpExecuterBase 0) { - sizeLimit = sizeLimit - ? std::min(sizeLimit, Request.TotalReadSizeLimitBytes) - : Request.TotalReadSizeLimitBytes; + sizeLimit = sizeLimit ? std::min(sizeLimit, Request.TotalReadSizeLimitBytes) : Request.TotalReadSizeLimitBytes; } if (totalReadSize > sizeLimit) { auto msg = TStringBuilder() << "Transaction total read size " << totalReadSize << " exceeded limit " << sizeLimit; LOG_N(msg); - ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, msg)); + ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, msg)); return; } @@ -1056,9 +1026,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record; - LOG_D("Got propose result" << - ", topic tablet: " << event.GetOrigin() << - ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); + LOG_D("Got propose result" << ", topic tablet: " << event.GetOrigin() + << ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); - TShardState *state = ShardStates.FindPtr(event.GetOrigin()); + TShardState* state = ShardStates.FindPtr(event.GetOrigin()); YQL_ENSURE(state); switch (event.GetStatus()) { @@ -1112,40 +1081,36 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetStatus()) - << ", error: " << res->Record.GetStatusMessage()); + LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxColumnShard::EResultStatus_Name(res->Record.GetStatus()) + << ", error: " << res->Record.GetStatusMessage()); -// if (Stats) { -// Stats->AddDatashardStats(std::move(*res->Record.MutableComputeActorStats()), -// std::move(*res->Record.MutableTxStats())); -// } + // if (Stats) { + // Stats->AddDatashardStats(std::move(*res->Record.MutableComputeActorStats()), + // std::move(*res->Record.MutableTxStats())); + // } switch (res->Record.GetStatus()) { - case NKikimrTxColumnShard::EResultStatus::SUCCESS: - { + case NKikimrTxColumnShard::EResultStatus::SUCCESS: { YQL_ENSURE(shardState->State == TShardState::EState::Executing); shardState->State = TShardState::EState::Finished; Counters->TxProxyMon->ResultsReceivedCount->Inc(); -// Counters->TxProxyMon->ResultsReceivedSize->Add(res->GetTxResult().size()); + // Counters->TxProxyMon->ResultsReceivedSize->Add(res->GetTxResult().size()); -// for (auto& lock : res->Record.GetTxLocks()) { -// LOG_D("Shard " << shardId << " completed, store lock " << lock.ShortDebugString()); -// Locks.emplace_back(std::move(lock)); -// } + // for (auto& lock : res->Record.GetTxLocks()) { + // LOG_D("Shard " << shardId << " completed, store lock " << lock.ShortDebugString()); + // Locks.emplace_back(std::move(lock)); + // } Counters->TxProxyMon->TxResultComplete->Inc(); CheckExecutionComplete(); return; } - case NKikimrTxColumnShard::EResultStatus::PREPARED: - { + case NKikimrTxColumnShard::EResultStatus::PREPARED: { YQL_ENSURE(false); } - default: - { + default: { return ShardError(res->Record); } } @@ -1162,18 +1127,17 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); - LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId - << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) - << ", TxId=" << ev->Get()->Record.GetTxId() - << ", Locks= " << [&]() { - TStringBuilder builder; - for (const auto& lock : ev->Get()->Record.GetTxLocks()) { - builder << lock.ShortDebugString(); - } - return builder; - }() - << ", Cookie=" << ev->Cookie - << ", error=" << issues.ToString()); + LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" + << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) + << ", TxId=" << ev->Get()->Record.GetTxId() << ", Locks= " << + [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Cookie=" << ev->Cookie + << ", error=" << issues.ToString()); if (Stats) { Stats->AddDatashardStats(std::move(*res->Record.MutableTxStats())); @@ -1205,16 +1169,14 @@ class TKqpDataExecuter : public TKqpExecuterBaseBrokenLockShardId = shardId; if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); + ResponseEv->BrokenLockPathId = + NYql::TKikimrPathId(res->Record.GetTxLocks(0).GetSchemeShard(), res->Record.GetTxLocks(0).GetPathId()); ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); } CheckExecutionComplete(); return; } - default: - { + default: { return ShardError(res->Record); } } @@ -1229,14 +1191,12 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetStatus()) - << ", error: " << res->GetError()); + LOG_D("Got propose result, shard: " << shardId + << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) + << ", error: " << res->GetError()); if (Stats) { - Stats->AddDatashardStats( - std::move(*res->Record.MutableComputeActorStats()), - std::move(*res->Record.MutableTxStats()), + Stats->AddDatashardStats(std::move(*res->Record.MutableComputeActorStats()), std::move(*res->Record.MutableTxStats()), TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())); } @@ -1269,9 +1229,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseBrokenLockShardId = shardId; if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); + ResponseEv->BrokenLockPathId = + NYql::TKikimrPathId(res->Record.GetTxLocks(0).GetSchemeShard(), res->Record.GetTxLocks(0).GetPathId()); return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); } @@ -1295,7 +1254,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseState) - << ", txPlanned: " << TxPlanned); + << ", txPlanned: " << TxPlanned); switch (shardState->State) { case TShardState::EState::Preparing: @@ -1355,12 +1314,12 @@ class TKqpDataExecuter : public TKqpExecuterBaseReattachState.Cookie); + Send(MakePipePerNodeCacheID(false), + new TEvPipeCache::TEvForward(new TEvDataShard::TEvProposeTransactionAttach(tabletId, TxId), tabletId, /* subscribe */ true), 0, + ++shardState->ReattachState.Cookie); } - void HandleExecute(TEvTxProxy::TEvProposeTransactionStatus::TPtr &ev) { + void HandleExecute(TEvTxProxy::TEvProposeTransactionStatus::TPtr& ev) { TEvTxProxy::TEvProposeTransactionStatus* res = ev->Get(); LOG_D("Got transaction status, status: " << res->GetStatus()); @@ -1400,8 +1359,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet(); - LOG_D("DeliveryProblem to shard " << msg->TabletId << ", notDelivered: " << msg->NotDelivered - << ", txPlanned: " << TxPlanned << ", coordinator: " << TxCoordinator); + LOG_D("DeliveryProblem to shard " << msg->TabletId << ", notDelivered: " << msg->NotDelivered << ", txPlanned: " << TxPlanned + << ", coordinator: " << TxCoordinator); if (msg->TabletId == TxCoordinator) { if (msg->NotDelivered) { @@ -1426,17 +1385,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseState) { case TShardState::EState::Prepared: // is it correct? - LOG_E("DeliveryProblem to shard " << msg->TabletId << ", notDelivered: " << msg->NotDelivered - << ", txPlanned: " << TxPlanned << ", coordinator: " << TxCoordinator); + LOG_E("DeliveryProblem to shard " << msg->TabletId << ", notDelivered: " << msg->NotDelivered << ", txPlanned: " << TxPlanned + << ", coordinator: " << TxCoordinator); Y_DEBUG_ABORT_UNLESS(false); // Proceed with query processing [[fallthrough]]; case TShardState::EState::Executing: { if ((wasRestarting || shardState->ReattachState.Reattaching) && - shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) - { - LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply (reattaching in " - << shardState->ReattachState.Delay << ")"); + shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) { + LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply (reattaching in " << shardState->ReattachState.Delay + << ")"); Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); ++shardState->RestartCount; @@ -1444,7 +1402,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseTabletId << " lost pipe while waiting for reply" - << (msg->NotDelivered ? " (last message not delivered)" : "")); + << (msg->NotDelivered ? " (last message not delivered)" : "")); return ReplyTxStateUnknown(msg->TabletId); } @@ -1487,7 +1445,7 @@ class TKqpDataExecuter : public TKqpExecuterBase TTask& { - YQL_ENSURE(!UseEvWrite); + YQL_ENSURE(!UseEvWriteForOltp); auto it = shardTasks.find(shardId); if (it != shardTasks.end()) { return TasksGraph.GetTask(it->second); @@ -1526,8 +1484,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAddUpdateOp(); } - } ShardsWithEffects.insert(task.Meta.ShardId); @@ -1592,8 +1548,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseDebugString()); + YQL_ENSURE(false, + "The previous check did not work! Data query read does not support column shard tables." << Endl << this->DebugString()); } default: { - YQL_ENSURE(false, "Unexpected table operation: " << (ui32) op.GetTypeCase() << Endl - << this->DebugString()); + YQL_ENSURE(false, "Unexpected table operation: " << (ui32)op.GetTypeCase() << Endl << this->DebugString()); } } } @@ -1618,16 +1572,14 @@ class TKqpDataExecuter : public TKqpExecuterBaseTypeRegistry)); + LOG_D("ActorState: " << CurrentStateFuncName() << ", stage: " << stageInfo.Id << " create datashard task: " << shardTask.second + << ", shard: " << shardTask.first << ", meta: " << task.Meta.ToString(keyTypes, *AppData()->TypeRegistry)); } } void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap) { - YQL_ENSURE(!UseEvWrite); + YQL_ENSURE(!UseEvWriteForOltp); TShardState shardState; shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; shardState.DatashardState.ConstructInPlace(); @@ -1667,43 +1619,25 @@ class TKqpDataExecuter : public TKqpExecuterBaseShardReadLocks = locksCount > 0; - LOG_D("State: " << CurrentStateFuncName() - << ", Executing KQP transaction on shard: " << shardId - << ", tasks: [" << JoinStrings(shardState.TaskIds.begin(), shardState.TaskIds.end(), ",") << "]" - << ", lockTxId: " << lockTxId - << ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString()); + LOG_D("State: " << CurrentStateFuncName() << ", Executing KQP transaction on shard: " << shardId << ", tasks: [" + << JoinStrings(shardState.TaskIds.begin(), shardState.TaskIds.end(), ",") << "]" << ", lockTxId: " << lockTxId + << ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString()); std::unique_ptr ev; if (isOlap) { - const ui32 flags = - (ImmediateTx ? NKikimrTxColumnShard::ETransactionFlag::TX_FLAG_IMMEDIATE: 0); + const ui32 flags = (ImmediateTx ? NKikimrTxColumnShard::ETransactionFlag::TX_FLAG_IMMEDIATE : 0); ev.reset(new TEvColumnShard::TEvProposeTransaction( - NKikimrTxColumnShard::TX_KIND_DATA, - SelfId(), - TxId, - dataTransaction.SerializeAsString(), - flags)); + NKikimrTxColumnShard::TX_KIND_DATA, SelfId(), TxId, dataTransaction.SerializeAsString(), flags)); } else { const ui32 flags = - (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) | - (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0); + (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) | (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0); std::unique_ptr evData; if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { - evData.reset(new TEvDataShard::TEvProposeTransaction( - NKikimrTxDataShard::TX_KIND_DATA, - SelfId(), - TxId, - dataTransaction.SerializeAsString(), - GetSnapshot().Step, - GetSnapshot().TxId, - flags)); + evData.reset(new TEvDataShard::TEvProposeTransaction(NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, + dataTransaction.SerializeAsString(), GetSnapshot().Step, GetSnapshot().TxId, flags)); } else { evData.reset(new TEvDataShard::TEvProposeTransaction( - NKikimrTxDataShard::TX_KIND_DATA, - SelfId(), - TxId, - dataTransaction.SerializeAsString(), - flags)); + NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, dataTransaction.SerializeAsString(), flags)); } ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1725,16 +1659,15 @@ class TKqpDataExecuter : public TKqpExecuterBase(); evWriteTransaction->Record = evWrite; - evWriteTransaction->Record.SetTxMode(ImmediateTx ? NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE : NKikimrDataEvents::TEvWrite::MODE_PREPARE); + evWriteTransaction->Record.SetTxMode( + ImmediateTx ? NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE : NKikimrDataEvents::TEvWrite::MODE_PREPARE); evWriteTransaction->Record.SetTxId(TxId); auto locksCount = evWriteTransaction->Record.GetLocks().LocksSize(); shardState.DatashardState->ShardReadLocks = locksCount > 0; - LOG_D("State: " << CurrentStateFuncName() - << ", Executing EvWrite (PREPARE) on shard: " << shardId - << ", TxId: " << TxId - << ", locks: " << evWriteTransaction->Record.GetLocks().ShortDebugString()); + LOG_D("State: " << CurrentStateFuncName() << ", Executing EvWrite (PREPARE) on shard: " << shardId << ", TxId: " << TxId + << ", locks: " << evWriteTransaction->Record.GetLocks().ShortDebugString()); auto traceId = ExecuterSpan.GetTraceId(); @@ -1746,20 +1679,19 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Description.Status == Ydb::StatusIds::SUCCESS, "failed to get secrets snapshot with issues: " << ev->Get()->Description.Issues.ToOneLineString()); + YQL_ENSURE(ev->Get()->Description.Status == Ydb::StatusIds::SUCCESS, + "failed to get secrets snapshot with issues: " << ev->Get()->Description.Issues.ToOneLineString()); for (size_t i = 0; i < SecretNames.size(); ++i) { SecureParams.emplace(SecretNames[i], ev->Get()->Description.SecretValues[i]); @@ -1798,7 +1731,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Status == Ydb::StatusIds::SUCCESS, "failed to save script external effect with issues: " << ev->Get()->Issues.ToOneLineString()); + YQL_ENSURE(ev->Get()->Status == Ydb::StatusIds::SUCCESS, + "failed to save script external effect with issues: " << ev->Get()->Issues.ToOneLineString()); SaveScriptExternalEffectRequired = false; if (!WaitRequired()) { @@ -1808,10 +1742,8 @@ class TKqpDataExecuter : public TKqpExecuterBase( - requestContext->CurrentExecutionId, requestContext->Database, - requestContext->CustomerSuppliedId, UserToken ? UserToken->GetUserSID() : "" - ); + auto scriptExternalEffect = std::make_unique(requestContext->CurrentExecutionId, + requestContext->Database, requestContext->CustomerSuppliedId, UserToken ? UserToken->GetUserSID() : ""); for (const auto& transaction : Request.Transactions) { for (const auto& secretName : transaction.Body->GetSecretNames()) { SecretSnapshotRequired = true; @@ -1859,7 +1791,7 @@ class TKqpDataExecuter : public TKqpExecuterBase& tables) { + bool HasOlapSink(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField<::NKqpProto::TKqpPhyTable>& tables) { return NKqp::HasOlapTableWriteInStage(stage, tables); } @@ -1886,17 +1818,17 @@ class TKqpDataExecuter : public TKqpExecuterBase error; if (stageInfo.Meta.ShardKey->RowOperation != TKeyDesc::ERowOperation::Read) { - error = TStringBuilder() << "Non-read operations can't be performed on async index table" - << ": " << stageInfo.Meta.ShardKey->TableId; + error = TStringBuilder() << "Non-read operations can't be performed on async index table" << ": " + << stageInfo.Meta.ShardKey->TableId; } else if (Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_READ_STALE) { - error = TStringBuilder() << "Read operation can be performed on async index table" - << ": " << stageInfo.Meta.ShardKey->TableId << " only with StaleRO isolation level"; + error = TStringBuilder() << "Read operation can be performed on async index table" << ": " + << stageInfo.Meta.ShardKey->TableId << " only with StaleRO isolation level"; } if (error) { LOG_E(*error); - ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, *error)); + ReplyErrorAndDie( + Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, *error)); return; } } @@ -1904,8 +1836,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetType(), stage))) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); - ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, error)); + ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, error)); return; } @@ -1914,8 +1845,7 @@ class TKqpDataExecuter : public TKqpExecuterBase 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - if (auto partitionsCount = BuildScanTasksFromSource( - stageInfo, + if (auto partitionsCount = BuildScanTasksFromSource(stageInfo, /* shardsResolved */ StreamResult, /* limitTasksPerNode */ StreamResult)) { sourceScanPartitionsCount += *partitionsCount; @@ -1957,8 +1887,8 @@ class TKqpDataExecuter : public TKqpExecuterBase> datashardTasks; // shardId -> [task] - THashMap> remoteComputeTasks; // shardId -> [task] + THashMap> datashardTasks; // shardId -> [task] + THashMap> remoteComputeTasks; // shardId -> [task] TVector computeTasks; if (StreamResult) { @@ -1986,7 +1916,7 @@ class TKqpDataExecuter : public TKqpExecuterBase Request.MaxComputeActors) { LOG_N("Too many compute actors: " << computeTasks.size()); - ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, - YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Requested too many execution units: " << computeTasks.size())); + ReplyErrorAndDie( + Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, + TStringBuilder() << "Requested too many execution units: " << computeTasks.size())); return; } ui32 shardsLimit = Request.MaxAffectedShards; - if (i64 msc = (i64) Request.MaxShardCount; msc > 0) { - shardsLimit = std::min(shardsLimit, (ui32) msc); + if (i64 msc = (i64)Request.MaxShardCount; msc > 0) { + shardsLimit = std::min(shardsLimit, (ui32)msc); } size_t shards = datashardTasks.size() + sourceScanPartitionsCount; if (shardsLimit > 0 && shards > shardsLimit) { LOG_W("Too many affected shards: datashardTasks=" << shards << ", limit: " << shardsLimit); Counters->TxProxyMon->TxResultError->Inc(); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, - YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Affected too many shards: " << datashardTasks.size())); + YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() << "Affected too many shards: " << datashardTasks.size())); return; } - bool fitSize = AllOf(datashardTasks, [this](const auto& x){ return ValidateTaskSize(x.second); }); + bool fitSize = AllOf(datashardTasks, [this](const auto& x) { + return ValidateTaskSize(x.second); + }); if (!fitSize) { Counters->TxProxyMon->TxResultError->Inc(); return; @@ -2025,13 +1956,10 @@ class TKqpDataExecuter : public TKqpExecuterBase shardIds; @@ -2114,7 +2044,8 @@ class TKqpDataExecuter : public TKqpExecuterBase 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { const auto& source = stage.GetSources(0).GetReadRangesSource(); bool isFullScan; - SourceScanStageIdToParititions[stageInfo.Id] = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv(), isFullScan); + SourceScanStageIdToParititions[stageInfo.Id] = + PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv(), isFullScan); if (isFullScan && !source.HasItemsLimit()) { Counters->Counters->FullScansExecuted->Inc(); } @@ -2127,9 +2058,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseSelfId(), TxId, false, std::move(shardIds)); + ExecuterStateSpan = NWilson::TSpan( + TWilsonKqp::ExecuterShardsResolve, ExecuterSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, false, std::move(shardIds)); KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); return; } else if (HasOlapTable) { @@ -2186,8 +2117,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record; - LOG_T("read snapshot result: " << record.GetStatus() << ", step: " << record.GetSnapshotStep() - << ", tx id: " << record.GetSnapshotTxId()); + LOG_T( + "read snapshot result: " << record.GetStatus() << ", step: " << record.GetSnapshotStep() << ", tx id: " << record.GetSnapshotTxId()); if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { ExecuterStateSpan.EndError(TStringBuilder() << Ydb::StatusIds::StatusCode_Name(record.GetStatus())); @@ -2217,30 +2148,23 @@ class TKqpDataExecuter : public TKqpExecuterBase>& datashardTasks, - TDatashardTxs& datashardTxs, - TEvWriteTxs& evWriteTxs, - TTopicTabletTxs& topicTxs) { - for (auto& [shardId, tasks]: datashardTasks) { - auto [it, success] = datashardTxs.emplace( - shardId, - TasksGraph.GetMeta().Allocate()); + void BuildDatashardTxs(THashMap>& datashardTasks, TDatashardTxs& datashardTxs, TEvWriteTxs& evWriteTxs, + TTopicTabletTxs& topicTxs) { + for (auto& [shardId, tasks] : datashardTasks) { + auto [it, success] = datashardTxs.emplace(shardId, TasksGraph.GetMeta().Allocate()); YQL_ENSURE(success, "unexpected duplicates in datashard transactions"); NKikimrTxDataShard::TKqpTransaction* dsTxs = it->second; dsTxs->MutableTasks()->Reserve(tasks.size()); - for (auto& task: tasks) { + for (auto& task : tasks) { dsTxs->AddTasks()->Swap(task); } } @@ -2256,22 +2180,18 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet(shardId).IsOlap) { if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) { locks = it->second->MutableLocks(); } else { - auto [eIt, success] = evWriteTxs.emplace( - shardId, - TasksGraph.GetMeta().Allocate()); + auto [eIt, success] = evWriteTxs.emplace(shardId, TasksGraph.GetMeta().Allocate()); locks = eIt->second->MutableLocks(); } } else { if (auto it = datashardTxs.find(shardId); it != datashardTxs.end()) { locks = it->second->MutableLocks(); } else { - auto [eIt, success] = datashardTxs.emplace( - shardId, - TasksGraph.GetMeta().Allocate()); + auto [eIt, success] = datashardTxs.emplace(shardId, TasksGraph.GetMeta().Allocate()); locks = eIt->second->MutableLocks(); } } @@ -2328,7 +2248,7 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingShardsSet; absl::flat_hash_set receivingShardsSet; - absl::flat_hash_set sendingColumnShardsSet; absl::flat_hash_set receivingColumnShardsSet; ui64 arbiter = 0; std::optional columnShardArbiter; @@ -2370,31 +2289,20 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet(shardId).IsOlap && HtapTx) { - if (tx->HasLocks()) { - // Locks may be broken so shards with locks need to send readsets - sendingColumnShardsSet.insert(shardId); - } - if (ShardsWithEffects.contains(shardId)) { - // Volatile transactions may abort effects, so they send readsets - if (VolatileTx) { - sendingColumnShardsSet.insert(shardId); - } - // Effects are only applied when all locks are valid - receivingColumnShardsSet.insert(shardId); - } - } else { - if (tx->HasLocks()) { - // Locks may be broken so shards with locks need to send readsets + if (tx->HasLocks()) { + // Locks may be broken so shards with locks need to send readsets + sendingShardsSet.insert(shardId); + } + if (ShardsWithEffects.contains(shardId)) { + // Volatile transactions may abort effects, so they send readsets + if (VolatileTx) { sendingShardsSet.insert(shardId); } - if (ShardsWithEffects.contains(shardId)) { - // Volatile transactions may abort effects, so they send readsets - if (VolatileTx) { - sendingShardsSet.insert(shardId); - } - // Effects are only applied when all locks are valid - receivingShardsSet.insert(shardId); + // Effects are only applied when all locks are valid + receivingShardsSet.insert(shardId); + + if (HtapTx && ShardIdToTableInfo->at(shardId).IsOlap) { + receivingColumnShardsSet.insert(shardId); } } } @@ -2432,10 +2340,8 @@ class TKqpDataExecuter : public TKqpExecuterBase= minArbiterMeshSize && - AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())) - { + if ((VolatileTx && receivingShardsSet.size() >= minArbiterMeshSize && + AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())) { std::vector candidates; candidates.reserve(receivingShardsSet.size()); for (ui64 candidate : receivingShardsSet) { @@ -2452,17 +2358,14 @@ class TKqpDataExecuter : public TKqpExecuterBase(receivingColumnShardsSet.size()); auto arbiterIterator = std::begin(receivingColumnShardsSet); std::advance(arbiterIterator, index); columnShardArbiter = *arbiterIterator; - - sendingShardsSet.insert(*columnShardArbiter); - receivingShardsSet.insert(*columnShardArbiter); } } - // Encode sending/receiving shards in tx bodies if (needCommit) { NProtoBuf::RepeatedField sendingShards(sendingShardsSet.begin(), sendingShardsSet.end()); @@ -2471,39 +2374,68 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingColumnShards(sendingColumnShardsSet.begin(), sendingColumnShardsSet.end()); - NProtoBuf::RepeatedField receivingColumnShards(receivingColumnShardsSet.begin(), receivingColumnShardsSet.end()); - - std::sort(sendingColumnShards.begin(), sendingColumnShards.end()); - std::sort(receivingColumnShards.begin(), receivingColumnShards.end()); - for (auto& [shardId, shardTx] : datashardTxs) { + AFL_ENSURE(!columnShardArbiter); shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; - *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; - if (arbiter) { - shardTx->MutableLocks()->SetArbiterShard(arbiter); + if (columnShardArbiter) { + shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter); + shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter); + if (sendingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddSendingShards(shardId); + } + if (receivingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddReceivingShards(shardId); + } + AFL_ENSURE(!arbiter); + } else { + *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; + *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + shardTx->MutableLocks()->SetArbiterShard(arbiter); + } } } - for (auto& [_, tx] : evWriteTxs) { + for (auto& [shardId, tx] : evWriteTxs) { tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - *tx->MutableLocks()->MutableSendingShards() = sendingShards; - *tx->MutableLocks()->MutableReceivingShards() = receivingShards; - *tx->MutableLocks()->MutableSendingColumnShards() = sendingColumnShards; - *tx->MutableLocks()->MutableReceivingColumnShards() = receivingColumnShards; - if (arbiter) { - tx->MutableLocks()->SetArbiterShard(arbiter); - } - if (columnShardArbiter) { + if (columnShardArbiter && *columnShardArbiter == shardId) { + tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter); + *tx->MutableLocks()->MutableSendingShards() = sendingShards; + *tx->MutableLocks()->MutableReceivingShards() = receivingShards; + } else if (columnShardArbiter) { tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter); + tx->MutableLocks()->AddSendingShards(*columnShardArbiter); + tx->MutableLocks()->AddReceivingShards(*columnShardArbiter); + if (sendingShardsSet.contains(shardId)) { + tx->MutableLocks()->AddSendingShards(shardId); + } + if (receivingShardsSet.contains(shardId)) { + tx->MutableLocks()->AddReceivingShards(shardId); + } + } else { + *tx->MutableLocks()->MutableSendingShards() = sendingShards; + *tx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + tx->MutableLocks()->SetArbiterShard(arbiter); + } } } - for (auto& [_, t] : topicTxs) { + for (auto& [shardId, t] : topicTxs) { t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit); - *t.tx.MutableSendingShards() = sendingShards; - *t.tx.MutableReceivingShards() = receivingShards; + if (columnShardArbiter) { + t.tx.AddSendingShards(*columnShardArbiter); + t.tx.AddReceivingShards(*columnShardArbiter); + if (sendingShardsSet.contains(shardId)) { + t.tx.AddSendingShards(shardId); + } + if (receivingShardsSet.contains(shardId)) { + t.tx.AddReceivingShards(shardId); + } + } else { + *t.tx.MutableSendingShards() = sendingShards; + *t.tx.MutableReceivingShards() = receivingShards; + } YQL_ENSURE(!arbiter); } } @@ -2535,13 +2467,13 @@ class TKqpDataExecuter : public TKqpExecuterBasePlanExecution(); if (err) { @@ -2636,23 +2567,19 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().size() : 0) - << ", useFollowers: " << GetUseFollowers()); + LOG_I("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: " << ReadOnlyTx << ", datashardTxs: " << DatashardTxs.size() + << ", evWriteTxs: " << EvWriteTxs.size() << ", topicTxs: " << Request.TopicOperations.GetSize() + << ", volatile: " << VolatileTx << ", immediate: " << ImmediateTx << ", pending compute tasks" + << (Planner ? Planner->GetPendingComputeTasks().size() : 0) << ", useFollowers: " << GetUseFollowers()); // error LOG_T("Updating channels after the creation of compute actors"); THashMap> updates; for (ui64 taskId : ComputeTasks) { auto& task = TasksGraph.GetTask(taskId); - if (task.ComputeActorId) + if (task.ComputeActorId) { CollectTaskChannelsUpdates(task, updates); + } } PropagateChannelsUpdates(updates); @@ -2684,18 +2611,12 @@ class TKqpDataExecuter : public TKqpExecuterBaseFollower = false; @@ -2714,7 +2635,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseBecome(&TThis::WaitShutdownState); LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " - << Planner->GetPendingComputeActors().size() << " compute actors"); + << Planner->GetPendingComputeActors().size() << " compute actors"); // TODO(ilezhankin): the CA awaiting timeout should be configurable. TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } @@ -2741,7 +2662,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTypeRewrite()) { + switch (ev->GetTypeRewrite()) { hFunc(TEvDqCompute::TEvState, HandleShutdown); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown); hFunc(TEvents::TEvPoison, HandleShutdown); @@ -2815,8 +2736,7 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; TShardIdToTableInfoPtr ShardIdToTableInfo; - const bool HtapTx = false; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2879,18 +2804,17 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, +IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, + const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + const TShardIdToTableInfoPtr& shardIdToTableInfo) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext, - useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, + tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, + statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 03d739b4f6c0..182b0aa1619d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -102,14 +102,11 @@ struct TKqpFederatedQuerySetup; IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index c1c617a3a277..89da1bd869d6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -78,23 +78,19 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + const TShardIdToTableInfoPtr& shardIdToTableInfo) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx - ); + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo); } TMaybe txsType; @@ -114,24 +110,24 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt case NKqpProto::TKqpPhyTx::TYPE_DATA: return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter( - std::move(request), database, userToken, counters, aggregation, - executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext, + std::move(request), database, userToken, counters, + tableServiceConfig, preparedQuery, userRequestContext, statementResultIndex ); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: return CreateKqpDataExecuter( std::move(request), database, userToken, counters, true, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, GUCSettings, shardIdToTableInfo ); default: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a3546e919bef..7d235f0ab68c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -123,9 +123,7 @@ class TKqpExecuterBase : public TActorBootstrapped { TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + const NKikimrConfig::TTableServiceConfig tableServiceConfig, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false) : Request(std::move(request)) @@ -134,8 +132,8 @@ class TKqpExecuterBase : public TActorBootstrapped { , Counters(counters) , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName) , Planner(nullptr) - , ExecuterRetriesConfig(executerRetriesConfig) - , AggregationSettings(aggregation) + , ExecuterRetriesConfig(tableServiceConfig.GetExecuterRetriesConfig()) + , AggregationSettings(tableServiceConfig.GetAggregationConfig()) , HasOlapTable(false) , StreamResult(streamResult) , StatementResultIndex(statementResultIndex) @@ -143,7 +141,7 @@ class TKqpExecuterBase : public TActorBootstrapped { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive(); TasksGraph.GetMeta().Database = Database; - TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion; + TasksGraph.GetMeta().ChannelTransportVersion = tableServiceConfig.GetChannelTransportVersion(); TasksGraph.GetMeta().UserRequestContext = userRequestContext; ResponseEv = std::make_unique(Request.TxAlloc, ExecType); ResponseEv->Orbit = std::move(Request.Orbit); @@ -1993,20 +1991,15 @@ class TKqpExecuterBase : public TActorBootstrapped { IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex); } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index a4b7363fe7a2..6c652f602f58 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -45,13 +45,11 @@ class TKqpScanExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, + : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::ScanExecuter, "ScanExecuter", false ) @@ -363,13 +361,11 @@ class TKqpScanExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex) { - return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, - preparedQuery, chanTransportVersion, userRequestContext, statementResultIndex); + return new TKqpScanExecuter(std::move(request), database, userToken, counters, tableServiceConfig, + preparedQuery, userRequestContext, statementResultIndex); } } // namespace NKqp diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index e73466374f81..a336dc9958e8 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -379,7 +379,6 @@ class TKqpNodeService : public TActorBootstrapped { FORCE_VALUE(MkqlHeavyProgramMemoryLimit) FORCE_VALUE(QueryMemoryLimit) FORCE_VALUE(PublishStatisticsIntervalSec); - FORCE_VALUE(EnableInstantMkqlMemoryAlloc); FORCE_VALUE(MaxTotalChannelBuffersSize); FORCE_VALUE(MinChannelBufferSize); FORCE_VALUE(MinMemAllocSize); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 6a10ccdbf721..bbbda9b390f4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -2491,7 +2491,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformerHasTx()) { - TKikimrTransactionContextBase emptyCtx(SessionCtx->Config().EnableKqpImmediateEffects); + TKikimrTransactionContextBase emptyCtx; emptyCtx.SetTempTables(SessionCtx->GetTempTablesState()); return emptyCtx.ApplyTableOperations(tableOps, tableInfo, queryType); } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 246fd20b3e99..2c14581e15f0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -260,8 +260,8 @@ bool AddDmlIssue(const TIssue& issue, TExprContext& ctx); class TKikimrTransactionContextBase : public TThrRefBase { public: - explicit TKikimrTransactionContextBase(bool enableImmediateEffects) : EnableImmediateEffects(enableImmediateEffects) { - } + explicit TKikimrTransactionContextBase() + {} bool HasStarted() const { return EffectiveIsolationLevel.Defined(); @@ -412,28 +412,10 @@ class TKikimrTransactionContextBase : public TThrRefBase { const bool currentModify = currentOps & KikimrModifyOps(); if (currentModify) { if (KikimrReadOps() & newOp) { - if (!EnableImmediateEffects) { - TString message = TStringBuilder() << "Data modifications previously made to table '" << table - << "' in current transaction won't be seen by operation: '" - << newOp << "'"; - const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message)); - issues.AddIssue(newIssue); - return {false, issues}; - } - HasUncommittedChangesRead = true; } if ((*info)->GetHasIndexTables()) { - if (!EnableImmediateEffects) { - TString message = TStringBuilder() - << "Multiple modification of table with secondary indexes is not supported yet"; - const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return {false, issues}; - } - HasUncommittedChangesRead = true; } } @@ -448,7 +430,6 @@ class TKikimrTransactionContextBase : public TThrRefBase { public: bool HasUncommittedChangesRead = false; - const bool EnableImmediateEffects; THashMap TableOperations; THashMap TableByIdMap; TMaybe EffectiveIsolationLevel; diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 996140db6750..cce434af338a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -155,7 +155,6 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpScanQueryStreamIdxLookupJoin = false; bool EnableKqpDataQueryStreamIdxLookupJoin = false; bool PredicateExtract20 = false; - bool EnableKqpImmediateEffects = false; bool EnablePreparedDdl = false; bool EnableSequences = false; bool EnableColumnsWithDefault = false; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 6632865dc0c5..8a75fe1ee3fe 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -818,7 +818,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedHasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { - YQL_ENSURE(TxCtx->EnableImmediateEffects); + if (TxCtx->HasOlapTable) { + // HTAP/OLAP transactions always use separate commit. + return false; + } + if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); // commit can be applied to the last transaction with effects diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f22acea1a10d..8769dafefcc7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -758,7 +758,7 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, - AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); + AppData()->TimeProvider, AppData()->RandomProvider); auto& alloc = QueryState->TxCtx->TxAlloc; ui64 mkqlInitialLimit = Settings.MkqlInitialMemoryLimit; @@ -838,7 +838,7 @@ class TKqpSessionActor : public TActorBootstrapped { } } else { QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, - AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); + AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; } @@ -1232,7 +1232,7 @@ class TKqpSessionActor : public TActorBootstrapped { } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); - if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) { YQL_ENSURE(txCtx.EnableImmediateEffects); request.UseImmediateEffects = true; } @@ -1292,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - bool useEvWrite = ( - (txCtx->HasOlapTable // olap only - && !txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink()) - || (txCtx->HasOltpTable // oltp only - && !txCtx->HasOlapTable - && Settings.TableService.GetEnableOltpSink()) - || (txCtx->HasOlapTable // htap - && txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink() - && Settings.TableService.GetEnableHtapTx())) - && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), - RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), - AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), + RequestCounters, Settings.TableService, + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo, - Settings.TableService.GetEnableHtapTx()); + QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1881,11 +1864,7 @@ class TKqpSessionActor : public TActorBootstrapped { void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { ui64 proxyRequestId = ev->Cookie; - auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() - ? Ydb::StatusIds::SESSION_BUSY - : Ydb::StatusIds::PRECONDITION_FAILED; - - ReplyProcessError(ev->Sender, proxyRequestId, busyStatus, "Pending previous query completion"); + ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion"); } static bool IsFatalError(const Ydb::StatusIds::StatusCode status) { diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index a18f69b7f347..dee3492d8939 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -332,11 +332,7 @@ class TKqpWorkerActor : public TActorBootstrapped { if (CleanupState->Final) { ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is being closed"); } else { - auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() - ? Ydb::StatusIds::SESSION_BUSY - : Ydb::StatusIds::PRECONDITION_FAILED; - - ReplyProcessError(ev->Sender, proxyRequestId, busyStatus, "Pending previous query completion"); + ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion"); } } @@ -893,11 +889,7 @@ class TKqpWorkerActor : public TActorBootstrapped { return; } - auto busyStatus = Settings.TableService.GetUseSessionBusyStatus() - ? Ydb::StatusIds::SESSION_BUSY - : Ydb::StatusIds::PRECONDITION_FAILED; - - ReplyProcessError(ev->Sender, proxyRequestId, busyStatus, + ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion"); } diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp index 7b189f3b0e37..95f47eca1a52 100644 --- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp @@ -58,7 +58,6 @@ namespace { Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(Upsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -115,7 +114,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpsertDuplicates) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -149,7 +147,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpsertExistingKey) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -196,7 +193,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(Replace) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -253,7 +249,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ReplaceDuplicates) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -287,7 +282,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ReplaceExistingKey) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -334,7 +328,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(Insert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -391,7 +384,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(InsertDuplicates) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -419,7 +411,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(InsertExistingKey) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -446,7 +437,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpdateOn) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -499,7 +489,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(Delete) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -548,7 +537,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpdateAfterUpsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -578,7 +566,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(DeleteAfterUpsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -617,7 +604,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpdateAfterInsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -647,7 +633,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(DeleteAfterInsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -687,7 +672,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpsertAfterInsert) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -713,7 +697,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpsertAfterInsertWithIndex) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -789,7 +772,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(DeleteOnAfterInsertWithIndex) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -851,7 +833,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(MultipleEffectsWithIndex) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -892,7 +873,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(InsertConflictTxAborted) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -933,7 +913,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UpsertConflictInteractiveTxAborted) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1006,7 +985,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(MultiShardUpsertAfterRead) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1039,7 +1017,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(TxWithReadAtTheEnd) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1083,7 +1060,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(InteractiveTxWithReadAtTheEnd) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1146,7 +1122,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(TxWithWriteAtTheEnd) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1189,7 +1164,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(InteractiveTxWithWriteAtTheEnd) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1251,7 +1225,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(UnobservedUncommittedChangeConflict) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1330,7 +1303,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(AlreadyBrokenImmediateEffects) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1418,7 +1390,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(WriteThenReadWithCommit) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1469,7 +1440,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyR1WR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1527,7 +1497,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyR1RWR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1589,7 +1558,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyR1WRR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1651,7 +1619,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyW1RR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1709,7 +1676,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyW1WR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1765,7 +1731,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyW1RWR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1822,7 +1787,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyW1WRR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1879,7 +1843,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyRW1RR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1941,7 +1904,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyRW1WR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -1998,7 +1960,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyRW1RWR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -2056,7 +2017,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ConflictingKeyRW1WRR2) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); @@ -2117,7 +2077,6 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { Y_UNIT_TEST(ForceImmediateEffectsExecution) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); auto serverSettings = TKikimrSettings().SetAppConfig(appConfig).SetEnableForceImmediateEffectsExecution(true); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index aa75e705876c..4ee35f1fdb2f 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -1466,10 +1466,6 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - const auto& config = kikimr.GetTestServer().GetSettings().AppConfig; - auto& tableSettings = config->GetTableServiceConfig(); - bool useSchemeCacheMeta = tableSettings.GetUseSchemeCacheMetadata(); - { auto tableBuilder = db.GetTableBuilder(); tableBuilder @@ -1541,8 +1537,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()) .ExtractValueSync(); // KIKIMR-7997 - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), - useSchemeCacheMeta ? NYdb::EStatus::SCHEME_ERROR : NYdb::EStatus::GENERIC_ERROR); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); } { diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index ac96894be004..476293242f27 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -203,7 +203,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { } } - auto permissionsSettings = + auto permissionsSettings = NYdb::NScheme::TModifyPermissionsSettings() .AddGrantPermissions(NYdb::NScheme::TPermissions("user0@builtin", grantPermissions)) .AddRevokePermissions(NYdb::NScheme::TPermissions("user0@builtin", revokePermissions)); @@ -384,10 +384,9 @@ Y_UNIT_TEST_SUITE(KqpQuery) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, true, result.GetIssues().ToString()); } - Y_UNIT_TEST_TWIN(QueryClientTimeout, EnableImmediateEffects) { + Y_UNIT_TEST(QueryClientTimeout) { NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - app.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(EnableImmediateEffects); auto serverSettings = TKikimrSettings() .SetAppConfig(app); @@ -1459,8 +1458,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) { auto client = kikimr.GetQueryClient(); auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - { + + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/Source` (Col1, Col2) VALUES (1u, 1), (100u, 100), (10u, 10); @@ -1623,7 +1622,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/Source` (Col1, Col2) VALUES (1u, 1), (100u, 100), (10u, 10); @@ -1676,7 +1675,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - { + { auto prepareResult = client.ExecuteQuery(R"( REPLACE INTO `/Root/Source` (Col1, Col2) VALUES (1u, 1), (100u, 100), (10u, 10); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 3c429f7a6e78..a16e3c3f7219 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -33,22 +33,19 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - auto& tableSettings = kikimr.GetTestServer().GetSettings().AppConfig->GetTableServiceConfig(); - bool useSchemeCacheMeta = tableSettings.GetUseSchemeCacheMetadata(); - auto result = session.ExecuteDataQuery(R"( SELECT * FROM `/Root/KeyValue`; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), - useSchemeCacheMeta ? EStatus::SCHEME_ERROR : EStatus::UNAUTHORIZED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); result = session.ExecuteDataQuery(R"( SELECT * FROM `/Root/NonExistent`; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), - useSchemeCacheMeta ? EStatus::SCHEME_ERROR : EStatus::UNAUTHORIZED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } Y_UNIT_TEST(UseNonexistentTable) { @@ -1629,14 +1626,14 @@ Y_UNIT_TEST_SUITE(KqpScheme) { Datetime64Column Datetime64, PRIMARY KEY (Key) ) WITH ( - TTL = Interval("P1D") ON Datetime64Column - ))"; - Cerr << query << Endl; + TTL = Interval("P1D") ON Datetime64Column + ))"; + Cerr << query << Endl; { auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - } + } void CreateTableWithUniformPartitions(bool compat) { TKikimrRunner kikimr; @@ -2470,7 +2467,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { ); auto result = adminSession.ExecuteSchemeQuery(grantQuery).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - + // It was discovered that TModifyACL scheme operation returns successfully without waiting for // SchemeBoard replicas to acknowledge the path updates. This can cause the SchemeCache to reply // with outdated entries, even if the SyncVersion flag is enabled. diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 98a4f407d66a..cf7e6cb36945 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3037,10 +3037,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } - Y_UNIT_TEST(TableSink_Htap) { + Y_UNIT_TEST_TWIN(TableSink_Htap, withOltpSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); auto settings = TKikimrSettings() .SetAppConfig(appConfig) @@ -3053,7 +3053,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { const TString query = R"( CREATE TABLE `/Root/ColumnShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3062,7 +3062,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CREATE TABLE `/Root/DataShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3073,12 +3073,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); + { auto result = client.ExecuteQuery(R"( UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES - (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13); UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES - (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13); INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`; REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`; SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; @@ -3189,13 +3190,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { auto result = client.ExecuteQuery(R"( - SELECT * FROM `/Root/DataShard` ORDER BY Col1; - SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; + SELECT * FROM `/Root/DataShard`; + SELECT * FROM `/Root/ColumnShard`; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])", - FormatResultSetYson(result.GetResultSet(0))); - CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); } { @@ -3210,11 +3210,11 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = client.ExecuteQuery(R"( SELECT * FROM `/Root/DataShard`; SELECT * FROM `/Root/ColumnShard`; - SELECT * FROM `/Root/DataShard`; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); - CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])", + FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1))); } { diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 6163bb9aa439..e33e1fc12bd2 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -171,7 +171,6 @@ Y_UNIT_TEST_SUITE(KqpService) { Y_UNIT_TEST(SessionBusy) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetUseSessionBusyStatus(true); auto kikimr = DefaultKikimrRunner({}, appConfig); auto db = kikimr.GetTableClient(); @@ -191,7 +190,6 @@ Y_UNIT_TEST_SUITE(KqpService) { Y_UNIT_TEST(SessionBusyRetryOperation) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetUseSessionBusyStatus(true); auto kikimr = DefaultKikimrRunner({}, appConfig); auto db = kikimr.GetTableClient(); @@ -223,7 +221,6 @@ Y_UNIT_TEST_SUITE(KqpService) { Y_UNIT_TEST(SessionBusyRetryOperationSync) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetUseSessionBusyStatus(true); auto kikimr = DefaultKikimrRunner({}, appConfig); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index 677318e04837..dde26fa77048 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -39,8 +39,6 @@ message TKqpLocks { optional uint64 ArbiterShard = 5; optional uint64 ArbiterColumnShard = 6; - repeated uint64 SendingColumnShards = 7; - repeated uint64 ReceivingColumnShards = 8; } message TTableId { diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 190a2b350987..03d109262541 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -27,7 +27,7 @@ message TTableServiceConfig { optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB optional uint64 QueryMemoryLimit = 6 [default = 32212254720]; // 30 GB optional uint32 PublishStatisticsIntervalSec = 7 [default = 2]; - optional bool EnableInstantMkqlMemoryAlloc = 8 [default = true]; + reserved 8; // optional bool EnableInstantMkqlMemoryAlloc = 8 [default = true]; reserved 9; // optional uint32 InstantMkqlMemoryAllocWaitTimeMs = 9 [default = 20]; reserved 10; // optional uint32 InstantMkqlMemoryAllocStepTimeMs = 10 [default = 1]; @@ -210,8 +210,8 @@ message TTableServiceConfig { optional uint32 CompileQueryCacheSize = 6 [default = 1000]; optional uint32 CompileMaxActiveRequests = 7 [default = 4]; optional uint32 CompileRequestQueueSize = 8 [default = 1000]; - optional bool UseSchemeCacheMetadata = 9 [default = true]; - optional bool UseSessionBusyStatus = 10 [default = true]; + reserved 9; // optional bool UseSchemeCacheMetadata = 9 [default = true]; + reserved 10; // optional bool UseSessionBusyStatus = 10 [default = true]; reserved 11; // (deprecated) AllowUnsafeCommit optional uint32 CompileTimeoutMs = 12 [default = 60000]; optional TResourceManager ResourceManager = 13; @@ -241,7 +241,7 @@ message TTableServiceConfig { optional bool EnableKqpDataQueryStreamIdxLookupJoin = 49 [default = false]; reserved 36; //optional bool EnablePredicateExtractForScanQueries = 36 [default = true]; reserved 37; //optional bool EnablePredicateExtractForDataQueries = 37 [default = true]; - optional bool EnableKqpImmediateEffects = 38 [default = true]; + reserved 38; // optional bool EnableKqpImmediateEffects = 38 [default = true]; reserved 39; // optional bool EnableSequentialReads = 39 [default = true]; optional bool EnablePreparedDdl = 42 [default = true]; optional bool EnableSequences = 43 [default = true]; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 8410fe52ad16..93dda2268ae5 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -281,7 +281,7 @@ class TCommitOperation { bool IsPrimary() const { AFL_VERIFY(NeedSyncLocks()); - return TabletId == *ReceivingShards.begin(); + return TabletId == ArbiterColumnShard; } TCommitOperation(const ui64 tabletId) @@ -293,13 +293,23 @@ class TCommitOperation { auto& locks = evWrite.Record.GetLocks(); auto& lock = evWrite.Record.GetLocks().GetLocks()[0]; SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); - if ((ui32)locks.GetSendingShards().size() != SendingShards.size()) { - return TConclusionStatus::Fail("duplications in SendingShards proto field"); - } ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - if ((ui32)locks.GetReceivingShards().size() != ReceivingShards.size()) { - return TConclusionStatus::Fail("duplications in ReceivingShards proto field"); + if (!ReceivingShards.size() || !SendingShards.size()) { + ReceivingShards.clear(); + SendingShards.clear(); + } else if (!locks.HasArbiterColumnShard()) { + ArbiterColumnShard = *ReceivingShards.begin(); + if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { + return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + } + } else { + ArbiterColumnShard = locks.GetArbiterColumnShard(); + AFL_VERIFY(ArbiterColumnShard); + if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { + return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + } } + TxId = evWrite.Record.GetTxId(); LockId = lock.GetLockId(); Generation = lock.GetGeneration(); @@ -313,14 +323,6 @@ class TCommitOperation { if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) { return TConclusionStatus::Fail("incorrect message type"); } - if (!ReceivingShards.size() || !SendingShards.size()) { - ReceivingShards.clear(); - SendingShards.clear(); - } else { - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); - } - } return TConclusionStatus::Success(); } @@ -331,8 +333,8 @@ class TCommitOperation { return std::make_unique( TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards); } else { - return std::make_unique( - TFullTxInfo::BuildFake(kind), LockId, *ReceivingShards.begin(), ReceivingShards.contains(TabletId)); + return std::make_unique(TFullTxInfo::BuildFake(kind), LockId, + ArbiterColumnShard, ReceivingShards.contains(TabletId)); } } @@ -343,6 +345,7 @@ class TCommitOperation { YDB_READONLY(ui64, TxId, 0); YDB_READONLY_DEF(std::set, SendingShards); YDB_READONLY_DEF(std::set, ReceivingShards); + ui64 ArbiterColumnShard = 0; }; class TProposeWriteTransaction: public NTabletFlatExecutor::TTransactionBase { @@ -430,7 +433,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto source = ev->Sender; const auto cookie = ev->Cookie; const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get()); - // AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString()); +// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString()); if (behaviourConclusion.IsFail()) { Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 9073e7458ae3..2ddb57cc4b1e 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -203,9 +203,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac for (auto&& i : ReceivingShards) { if (WaitShardsResultAck.contains(i)) { NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent), - new TEvPipeCache::TEvForward( - new TEvTxProcessing::TEvReadSet(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i, - true), + new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i, + owner.TabletID(), readSetData.SerializeAsString()), + i, true), IEventHandle::FlagTrackDelivery, GetTxId()); } } diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index f7bbe62eed62..fb12ecdbcd8d 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1699,7 +1699,6 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); NKikimrConfig::TAppConfig appCfg; - appCfg.MutableTableServiceConfig()->SetEnableKqpImmediateEffects(true); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) .SetDomainPlanResolution(1000) diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 6d722b459944..afb7af350bf4 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -590,9 +590,6 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) { .UseSecureConnection(NYdbSslTestData::CaCrt) .SetEndpoint(location)); - auto& tableSettings = server.GetServer().GetSettings().AppConfig->GetTableServiceConfig(); - bool useSchemeCacheMeta = tableSettings.GetUseSchemeCacheMetadata(); - { auto session = CreateSession(connection, "root@builtin"); { @@ -648,8 +645,7 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) { )__",TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_EQUAL(status.IsTransportError(), false); - UNIT_ASSERT_EQUAL(status.GetStatus(), - useSchemeCacheMeta ? EStatus::SCHEME_ERROR : EStatus::UNAUTHORIZED); + UNIT_ASSERT_EQUAL(status.GetStatus(), EStatus::SCHEME_ERROR); } } }