Skip to content

Commit

Permalink
The race between TEvProposeTransaction and TEvLockStatus (#8517) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Sep 4, 2024
1 parent b9eddd2 commit ae19a36
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 63 deletions.
17 changes: 8 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction*>;
using TEvWriteTxs = THashMap<ui64, NKikimrDataEvents::TEvWrite*>;
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;

void ContinueExecute() {
if (Stats) {
Expand Down Expand Up @@ -2424,10 +2424,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

for (auto& [_, tx] : topicTxs) {
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
for (auto& [_, t] : topicTxs) {
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*t.tx.MutableSendingShards() = sendingShards;
*t.tx.MutableReceivingShards() = receivingShards;
YQL_ENSURE(!arbiter);
}
}
Expand Down Expand Up @@ -2589,13 +2589,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
writeId = Request.TopicOperations.GetWriteId();
}

for (auto& tx : topicTxs) {
auto tabletId = tx.first;
auto& transaction = tx.second;
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();

if (writeId.Defined()) {
if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
HasWriteOperations_ = true;
}

void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
{
Y_ABORT_UNLESS(TabletId_.Defined());
Y_ABORT_UNLESS(Partition_.Defined());

auto& tx = txs[*TabletId_];
auto& t = txs[*TabletId_];

for (auto& [consumer, operations] : Operations_) {
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
auto [begin, end] = operations.GetRange();
o->SetBegin(begin);
Expand All @@ -123,12 +123,13 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
}

if (HasWriteOperations_) {
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
if (SupportivePartition_.Defined()) {
o->SetSupportivePartition(*SupportivePartition_);
}
t.hasWrite = true;
}
}

Expand Down Expand Up @@ -355,7 +356,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
return true;
}

void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
{
for (auto& [_, operations] : Operations_) {
operations.BuildTopicTxs(txs);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class TConsumerOperations {
TDisjointIntervalTree<ui64> Offsets_;
};

struct TTopicOperationTransaction {
NKikimrPQ::TDataTransaction tx;
bool hasWrite = false;
};

using TTopicOperationTransactions = THashMap<ui64, TTopicOperationTransaction>;

class TTopicPartitionOperations {
public:
bool IsValid() const;
Expand All @@ -52,7 +59,7 @@ class TTopicPartitionOperations {
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void BuildTopicTxs(TTopicOperationTransactions &txs);

void Merge(const TTopicPartitionOperations& rhs);

Expand Down Expand Up @@ -109,7 +116,7 @@ class TTopicOperations {
Ydb::StatusIds_StatusCode& status,
TString& message);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void BuildTopicTxs(TTopicOperationTransactions &txs);

void Merge(const TTopicOperations& rhs);

Expand Down
126 changes: 82 additions & 44 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
for (size_t i = 0; i != info.TxWritesSize(); ++i) {
auto& txWrite = info.GetTxWrites(i);
const TWriteId writeId = GetWriteId(txWrite);
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

TxWrites[writeId].Partitions.emplace(partitionId, shadowPartitionId);
TTxWriteInfo& writeInfo = TxWrites[writeId];
if (txWrite.HasOriginalPartitionId()) {
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

AddSupportivePartition(shadowPartitionId);
CreateSupportivePartitionActor(shadowPartitionId, ctx);
SubscribeWriteId(writeId, ctx);
writeInfo.Partitions.emplace(partitionId, shadowPartitionId);

AddSupportivePartition(shadowPartitionId);
CreateSupportivePartitionActor(shadowPartitionId, ctx);

NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
}

NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
SubscribeWriteId(writeId, ctx);
}

NewSupportivePartitions.clear();
Expand Down Expand Up @@ -3283,7 +3288,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
operation.GetSupportivePartition());
PQ_LOG_D("partitionId=" << partitionId);
PQ_LOG_D("PartitionId " << partitionId << " for WriteId " << writeId);
return Partitions.contains(partitionId);
}

Expand All @@ -3294,7 +3299,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
}

const TWriteId writeId = GetWriteId(txBody);
PQ_LOG_D("writeId=" << writeId);

for (auto& operation : txBody.GetOperations()) {
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
Expand All @@ -3320,7 +3324,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
const NKikimrPQ::TDataTransaction& txBody = event.GetData();

if (TabletState != NKikimrPQ::ENormal) {
PQ_LOG_D("invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
PQ_LOG_D("TxId " << event.GetTxId() << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::ERROR,
Expand All @@ -3334,7 +3338,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
//

if (txBody.OperationsSize() <= 0) {
PQ_LOG_D("empty list of operations");
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
Expand All @@ -3344,7 +3348,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
}

if (!CheckTxWriteOperations(txBody)) {
PQ_LOG_D("invalid WriteId " << txBody.GetWriteId());
PQ_LOG_D("TxId " << event.GetTxId() << " invalid WriteId " << txBody.GetWriteId());
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
Expand All @@ -3353,9 +3357,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}

if (txBody.HasWriteId()) {
const TWriteId writeId = GetWriteId(txBody);
if (!TxWrites.contains(writeId)) {
PQ_LOG_D("TxId " << event.GetTxId() << " unknown WriteId " << writeId);
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
"unknown WriteId",
ctx);
return;
}

TTxWriteInfo& writeInfo = TxWrites.at(writeId);
if (writeInfo.Deleting) {
PQ_LOG_W("TxId " << event.GetTxId() << " WriteId " << writeId << " will be deleted");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
"WriteId will be deleted",
ctx);
return;
}

writeInfo.TxId = event.GetTxId();
PQ_LOG_D("TxId " << event.GetTxId() << " has WriteId " << writeId);
}

TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
if (!partitionId.Defined()) {
PQ_LOG_D("unknown partition for WriteId " << txBody.GetWriteId());
PQ_LOG_W("TxId " << event.GetTxId() << " unknown partition for WriteId " << txBody.GetWriteId());
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::INTERNAL,
Expand Down Expand Up @@ -3568,13 +3599,15 @@ bool TPersQueue::CanProcessTxWrites() const
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
PQ_LOG_D("send TEvSubscribeLock for WriteId " << writeId);
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
PQ_LOG_D("send TEvUnsubscribeLock for WriteId " << writeId);
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
}
Expand Down Expand Up @@ -3876,11 +3909,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
{
for (auto& [writeId, write] : TxWrites) {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
if (write.Partitions.empty()) {
auto* txWrite = info.MutableTxWrites()->Add();
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
} else {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
auto* txWrite = info.MutableTxWrites()->Add();
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
}
}
}

Expand Down Expand Up @@ -4325,6 +4363,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);

PQ_LOG_D("delete partitions for TxId " << tx.TxId);
BeginDeletePartitions(tx);

tx.State = NKikimrPQ::TTransaction::EXECUTED;
PQ_LOG_D("TxId " << tx.TxId <<
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
Expand All @@ -4343,8 +4384,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
", AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
if (tx.HaveAllRecipientsReceive() && AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
DeleteTx(tx);
// implicitly switch to the state DELETING
}
Expand All @@ -4369,7 +4410,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
}
}

bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
{
if (!writeId.Defined()) {
return true;
Expand All @@ -4380,26 +4421,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
TabletID(), writeId->NodeId, writeId->KeyId);
const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);

bool disabled =
(writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
PQ_LOG_D("WriteId " << *writeId <<
" Partitions.size=" << writeInfo.Partitions.size());
bool deleted =
writeInfo.Partitions.empty()
;

PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));

return disabled;
return deleted;
}

void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
{
if (!writeId.Defined()) {
if (!writeId.Defined() || !TxWrites.contains(*writeId)) {
return;
}

Y_ABORT_UNLESS(TxWrites.contains(*writeId),
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
TabletID(), writeId->NodeId, writeId->KeyId);

PQ_LOG_D("delete WriteId " << *writeId);
TxWrites.erase(*writeId);
}
Expand Down Expand Up @@ -4729,7 +4765,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
}
}

void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx)
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
{
PQ_LOG_D("Handle TEvLongTxService::TEvLockStatus " << ev->Get()->Record.ShortDebugString());

Expand All @@ -4750,22 +4786,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
return;
}

if (!writeInfo.TxId.Defined()) {
PQ_LOG_D("delete write info for WriteId " << writeId);
// the message TEvProposeTransaction will not come anymore
BeginDeletePartitions(writeInfo);
if (writeInfo.TxId.Defined()) {
// the message `TEvProposeTransaction` has already arrived
PQ_LOG_D("there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
return;
}

ui64 txId = *writeInfo.TxId;
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);

auto* tx = GetTransaction(ctx, txId);
if (!tx ||
(tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
(tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
BeginDeletePartitions(writeInfo);
}
PQ_LOG_D("delete partitions for WriteId " << writeId);
BeginDeletePartitions(writeInfo);
}

void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
Expand Down Expand Up @@ -4865,6 +4893,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
writeInfo.Deleting = true;
}

void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
{
if (!tx.WriteId.Defined() || !TxWrites.contains(*tx.WriteId)) {
return;
}

TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
BeginDeletePartitions(writeInfo);
}

TString TPersQueue::LogPrefix() const {
return TStringBuilder() << "[PQ: " << TabletID() << "] ";
}
Expand Down Expand Up @@ -4919,7 +4957,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
hFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
Expand Down
Loading

0 comments on commit ae19a36

Please sign in to comment.