Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[24-3] Mixed transactions: ProposeTx + EvWrite #10932

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+w
ydb/core/kqp/ut/tx KqpSinkTx.InvalidateOnError
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
Expand All @@ -32,10 +29,8 @@ ydb/core/kqp/ut/scheme [44/50]*
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap*
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/persqueue/ut [37/40] chunk chunk
ydb/core/persqueue/ut [38/40] chunk chunk
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
}

if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
YQL_ENSURE(txCtx.EnableImmediateEffects);
return true;
}

Expand Down
12 changes: 5 additions & 7 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, bool enableImmediateEffects)
: NYql::TKikimrTransactionContextBase(enableImmediateEffects)
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
: NYql::TKikimrTransactionContextBase()
, Implicit(implicit)
, ParamsState(MakeIntrusive<TParamsState>())
{
Expand Down Expand Up @@ -268,8 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead) {
YQL_ENSURE(EnableImmediateEffects);
if (HasUncommittedChangesRead || HasOlapTable) {
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -298,8 +297,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
YQL_ENSURE(EnableImmediateEffects);
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
return false;
}

Expand Down Expand Up @@ -379,7 +377,7 @@ struct THash<NKikimr::NKqp::TTxId> {
};

namespace NKikimr::NKqp {

class TTransactionsCache {
size_t MaxActiveSize;
THashMap<TTxId, TIntrusivePtr<TKqpTransactionContext>, THash<NKikimr::NKqp::TTxId>> Active;
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup();
kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
kqpConfig.EnableKqpDataQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
kqpConfig.EnableKqpImmediateEffects = serviceConfig.GetEnableKqpImmediateEffects();
kqpConfig.EnablePreparedDdl = serviceConfig.GetEnablePreparedDdl();
kqpConfig.EnableSequences = serviceConfig.GetEnableSequences();
kqpConfig.EnableColumnsWithDefault = serviceConfig.GetEnableColumnsWithDefault();
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,13 +513,11 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();

bool enableKqpDataQuerySourceRead = TableServiceConfig.GetEnableKqpDataQuerySourceRead();
bool enableKqpScanQuerySourceRead = TableServiceConfig.GetEnableKqpScanQuerySourceRead();

bool predicateExtract20 = TableServiceConfig.GetPredicateExtract20();

bool defaultSyntaxVersion = TableServiceConfig.GetSqlVersion();
bool enableKqpImmediateEffects = TableServiceConfig.GetEnableKqpImmediateEffects();

auto indexAutoChooser = TableServiceConfig.GetIndexAutoChooseMode();

Expand Down Expand Up @@ -556,10 +554,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableKqpScanQueryStreamLookup() != enableKqpScanQueryStreamLookup ||
TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin ||
TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin() != enableKqpDataQueryStreamIdxLookupJoin ||
TableServiceConfig.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead ||
TableServiceConfig.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||
TableServiceConfig.GetPredicateExtract20() != predicateExtract20 ||
TableServiceConfig.GetEnableKqpImmediateEffects() != enableKqpImmediateEffects ||
TableServiceConfig.GetIndexAutoChooseMode() != indexAutoChooser ||
TableServiceConfig.GetEnableSequences() != enableSequences ||
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);

/* sink writes */
WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true);
WriteActorsCount = KqpGroup->GetCounter("SinkWrites/WriteActorsCount", false);
WriteActorImmediateWrites = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWrites", true);
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
WriteActorWritesSizeHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));
WriteActorWritesOperationsHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesOperations", NMonitoring::ExponentialHistogram(20, 2, 1));
WriteActorWritesLatencyHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1));

/* sequencers */

SequencerActorsCount = KqpGroup->GetCounter("Sequencer/ActorCount", false);
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,15 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;

// Sink write counters
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsShardResolve;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsCount;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWrites;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
NMonitoring::THistogramPtr WriteActorWritesSizeHistogram;
NMonitoring::THistogramPtr WriteActorWritesOperationsHistogram;
NMonitoring::THistogramPtr WriteActorWritesLatencyHistogram;

// Scheduler signals
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
Expand Down
Loading
Loading