Skip to content

Commit

Permalink
HTAP: ProposeTx + EvWrite (#9236)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 16, 2024
1 parent 8dff0b9 commit e6d795f
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 65 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead) {
if (HasUncommittedChangesRead || HasOlapTable) {
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -297,7 +297,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
return false;
}

Expand Down
52 changes: 32 additions & 20 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWrite(useEvWrite)
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
, HtapTx(htapTx)
{
Target = creator;

Expand Down Expand Up @@ -1487,7 +1487,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

auto getShardTask = [&](ui64 shardId) -> TTask& {
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
auto it = shardTasks.find(shardId);
if (it != shardTasks.end()) {
return TasksGraph.GetTask(it->second);
Expand Down Expand Up @@ -1627,7 +1627,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
{
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
TShardState shardState;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();
Expand Down Expand Up @@ -2030,7 +2030,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
Expand Down Expand Up @@ -2261,7 +2260,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
NKikimrDataEvents::TKqpLocks* locks = nullptr;

if (UseEvWrite) {
if (UseEvWriteForOltp || ShardIdToTableInfo->Get(shardId).IsOlap) {
if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) {
locks = it->second->MutableLocks();
} else {
Expand Down Expand Up @@ -2333,15 +2332,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Note: currently persistent channels are never used
!HasPersistentChannels &&
// Can't use volatile transactions for EvWrite at current time
!UseEvWrite);
evWriteTxs.empty());

const bool useGenericReadSets = (
// Use generic readsets when feature is explicitly enabled
AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
// Volatile transactions must always use generic readsets
VolatileTx ||
// Transactions with topics must always use generic readsets
!topicTxs.empty());
!topicTxs.empty() ||
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
Expand Down Expand Up @@ -2463,12 +2464,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
std::sort(receivingShards.begin(), receivingShards.end());

for (auto& [shardId, shardTx] : datashardTxs) {
AFL_ENSURE(!columnShardArbiter);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
if (columnShardArbiter) {
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddSendingShards(shardId);
}
if (receivingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddReceivingShards(shardId);
}
AFL_ENSURE(!arbiter);
} else {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}
}

Expand Down Expand Up @@ -2844,11 +2856,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool UseEvWrite = false;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
const bool HtapTx = false;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2890,13 +2902,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
std::move(asyncIoFactory), creator, userRequestContext,
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
}

} // namespace NKqp
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
17 changes: 8 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);
}

Expand All @@ -113,8 +112,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false, tableServiceConfig,
std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
Expand All @@ -128,8 +127,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo
);

default:
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2031,10 +2031,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ class TKqpQueryState : public TNonCopyable {
return false;
}

if (TxCtx->HasOlapTable) {
// HTAP/OLAP transactions always use separate commit.
return false;
}

if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
Expand Down
21 changes: 2 additions & 19 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
request.UseImmediateEffects = true;
}
}
Expand Down Expand Up @@ -1292,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.ResourceManager_ = ResourceManager_;
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());

bool useEvWrite = (
(txCtx->HasOlapTable // olap only
&& !txCtx->HasOltpTable
&& Settings.TableService.GetEnableOlapSink())
|| (txCtx->HasOltpTable // oltp only
&& !txCtx->HasOlapTable
&& Settings.TableService.GetEnableOltpSink())
|| (txCtx->HasOlapTable // htap
&& txCtx->HasOltpTable
&& Settings.TableService.GetEnableOlapSink()
&& Settings.TableService.GetEnableHtapTx()))
&& (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService,
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo,
Settings.TableService.GetEnableHtapTx());
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo);

auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
Expand Down
Loading

0 comments on commit e6d795f

Please sign in to comment.