Skip to content

Commit

Permalink
Merge 1c3ad36 into 5ef387d
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 25, 2024
2 parents 5ef387d + 1c3ad36 commit d5bcf7b
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 113 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead) {
if (HasUncommittedChangesRead || HasOlapTable) {
YQL_ENSURE(EnableImmediateEffects);
return !DeferredEffects.Empty();
}
Expand Down Expand Up @@ -298,7 +298,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
YQL_ENSURE(EnableImmediateEffects);
return false;
}
Expand Down
70 changes: 40 additions & 30 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,19 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWrite(useEvWrite)
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
, HtapTx(htapTx)
{
Target = creator;

Expand Down Expand Up @@ -1487,7 +1485,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

auto getShardTask = [&](ui64 shardId) -> TTask& {
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
auto it = shardTasks.find(shardId);
if (it != shardTasks.end()) {
return TasksGraph.GetTask(it->second);
Expand Down Expand Up @@ -1627,7 +1625,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
{
YQL_ENSURE(!UseEvWrite);
YQL_ENSURE(!UseEvWriteForOltp);
TShardState shardState;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();
Expand Down Expand Up @@ -2025,7 +2023,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
Expand Down Expand Up @@ -2256,7 +2253,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
NKikimrDataEvents::TKqpLocks* locks = nullptr;

if (UseEvWrite) {
if (UseEvWriteForOltp || ShardIdToTableInfo->Get(shardId).IsOlap) {
if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) {
locks = it->second->MutableLocks();
} else {
Expand Down Expand Up @@ -2328,15 +2325,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Note: currently persistent channels are never used
!HasPersistentChannels &&
// Can't use volatile transactions for EvWrite at current time
!UseEvWrite);
evWriteTxs.empty());

const bool useGenericReadSets = (
// Use generic readsets when feature is explicitly enabled
AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
// Volatile transactions must always use generic readsets
VolatileTx ||
// Transactions with topics must always use generic readsets
!topicTxs.empty());
!topicTxs.empty() ||
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
Expand Down Expand Up @@ -2479,10 +2478,22 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

for (auto& [shardId, shardTx] : datashardTxs) {
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
if (columnShardArbiter) {
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddSendingShards(shardId);
}
if (receivingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddReceivingShards(shardId);
}
AFL_ENSURE(!arbiter);
} else {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}
}

Expand Down Expand Up @@ -2836,11 +2847,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool UseEvWrite = false;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
const bool HtapTx = false;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2879,18 +2890,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

} // namespace

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext,
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult,
tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext,
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
}

} // namespace NKqp
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ struct TKqpFederatedQuerySetup;

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
32 changes: 14 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,19 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
);
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo);
}

TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
Expand All @@ -114,24 +110,24 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
case NKqpProto::TKqpPhyTx::TYPE_DATA:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(
std::move(request), database, userToken, counters, aggregation,
executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext,
std::move(request), database, userToken, counters,
tableServiceConfig, preparedQuery, userRequestContext,
statementResultIndex
);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo
);

default:
Expand Down
23 changes: 8 additions & 15 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
: Request(std::move(request))
Expand All @@ -134,16 +132,16 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
, Counters(counters)
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, AggregationSettings(aggregation)
, ExecuterRetriesConfig(tableServiceConfig.GetExecuterRetriesConfig())
, AggregationSettings(tableServiceConfig.GetAggregationConfig())
, HasOlapTable(false)
, StreamResult(streamResult)
, StatementResultIndex(statementResultIndex)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
TasksGraph.GetMeta().ChannelTransportVersion = tableServiceConfig.GetChannelTransportVersion();
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
ResponseEv->Orbit = std::move(Request.Orbit);
Expand Down Expand Up @@ -1993,20 +1991,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex);

} // namespace NKqp
Expand Down
Loading

0 comments on commit d5bcf7b

Please sign in to comment.