Skip to content

Commit

Permalink
Oltp EvWrite fixes (#12279)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 4, 2024
1 parent 181d1e0 commit 5e818ab
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 10 deletions.
42 changes: 33 additions & 9 deletions ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
auto& shardInfo = ShardsInfo.at(shardId);
if (auto lockPtr = shardInfo.Locks.FindPtr(lock.GetKey()); lockPtr) {
if (lock.Proto.GetHasWrites()) {
AFL_ENSURE(!ReadOnly);
lockPtr->Lock.Proto.SetHasWrites(true);
}

Expand Down Expand Up @@ -186,7 +187,12 @@ class TKqpTransactionManager : public IKqpTransactionManager {
}

bool IsVolatile() const override {
return !HasOlapTable();
return !HasOlapTable()
&& !IsReadOnly()
&& !IsSingleShard();

// TODO: && !HasPersistentChannels;
// Note: currently persistent channels are never used
}

bool HasSnapshot() const override {
Expand All @@ -213,10 +219,15 @@ class TKqpTransactionManager : public IKqpTransactionManager {
return ShardsIds.size();
}

bool NeedCommit() const override {
const bool dontNeedCommit = IsReadOnly() && (IsSingleShard() || HasSnapshot());
return !dontNeedCommit;
}

void StartPrepare() override {
AFL_ENSURE(!CollectOnly);
AFL_ENSURE(State == ETransactionState::COLLECTING);
AFL_ENSURE(!IsReadOnly());
AFL_ENSURE(NeedCommit());

THashSet<ui64> sendingColumnShardsSet;
THashSet<ui64> receivingColumnShardsSet;
Expand All @@ -242,8 +253,6 @@ class TKqpTransactionManager : public IKqpTransactionManager {
shardInfo.State = EShardState::PREPARING;
}

Y_ABORT_UNLESS(!ReceivingShards.empty());

constexpr size_t minArbiterMeshSize = 5;
if ((IsVolatile() &&
ReceivingShards.size() >= minArbiterMeshSize))
Expand Down Expand Up @@ -286,7 +295,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {

TPrepareInfo GetPrepareTransactionInfo() override {
AFL_ENSURE(State == ETransactionState::PREPARING);
AFL_ENSURE(!ReceivingShards.empty());
AFL_ENSURE(!ReceivingShards.empty() || !SendingShards.empty());

TPrepareInfo result {
.SendingShards = SendingShards,
Expand Down Expand Up @@ -323,7 +332,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
AFL_ENSURE(State == ETransactionState::PREPARING
|| (State == ETransactionState::COLLECTING
&& IsSingleShard()));
AFL_ENSURE(!IsReadOnly());
AFL_ENSURE(NeedCommit());
State = ETransactionState::EXECUTING;

for (auto& [_, shardInfo] : ShardsInfo) {
Expand Down Expand Up @@ -360,9 +369,24 @@ class TKqpTransactionManager : public IKqpTransactionManager {
AFL_ENSURE(shardInfo.State == EShardState::EXECUTING);
shardInfo.State = EShardState::FINISHED;

// Either all shards committed or all shards failed,
// so we need to wait only for one answer from ReceivingShards.
return ReceivingShards.contains(shardId) || IsSingleShard();
if (IsSingleShard() || ReceivingShards.contains(shardId)) {
// Either all shards committed write or all shards failed,
// so we need to wait only for one answer from ReceivingShards.
return true;
} else if (IsReadOnly() && !HasSnapshot()) {
AFL_ENSURE(ReceivingShards.empty());
// NOTE: In this case we have a possible RW transaction, that didn't write anything.
// For example, statement 'update dst set ... where ...' or 'insert into dst select from src where ...'.
// So, it's ok to use distributed commit in this case,
// because in general case (possible RW tx is RW tx) tx will be executed faster
// due to absence of taking snapshot (up to 10ms).

// In case of read only multishard tx without snapshot,
// we need to wait for all shards answers (to check locks).
AFL_ENSURE(SendingShards.erase(shardId) == 1);
return SendingShards.empty();
}
return false;
}

private:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/kqp_tx_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class IKqpTransactionManager {
virtual const THashSet<ui64>& GetShards() const = 0;
virtual ui64 GetShardsCount() const = 0;

virtual bool NeedCommit() const = 0;

virtual void StartPrepare() = 0;

struct TPrepareInfo {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
for (auto& [_, info] : WriteInfos) {
info.WriteTableActor->FlushBuffers();
}
if (TxManager->IsReadOnly()) {

if (!TxManager->NeedCommit()) {
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4309,6 +4309,14 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
// Empty replace
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard2` SELECT * FROM `/Root/DataShard` WHERE Col2 == 'not exists';
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto it = client.StreamExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard2`;
Expand Down

0 comments on commit 5e818ab

Please sign in to comment.