Skip to content

Commit

Permalink
TEvProposeTransaction handler (ydb-platform#1963)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Feb 29, 2024
1 parent 652ecef commit def1b93
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 54 deletions.
3 changes: 2 additions & 1 deletion ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
{
for (auto& [_, value] : Operations_) {
if (value.GetTabletId() == tabletId) {
return value.HasReadOperations();
// reading from a topic and writing to a topic contain read operations
return value.HasReadOperations() || value.HasWriteOperations();
}
}
return false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
TVector<NKikimrPQ::TPartitionOperation> Operations;
TActorId SupportivePartitionActor;
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();

if (!txBody.GetImmediate()) {
if (!txBody.GetImmediate() || txBody.HasWriteId()) {
ReplyPropose(ctx,
event,
NKikimrPQ::TEvProposeTransactionResult::ABORTED);
Expand Down Expand Up @@ -1616,6 +1616,10 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
Y_UNUSED(ctx);
bool predicate = true;

if (tx.SupportivePartitionActor != TActorId()) {
return false;
}

for (auto& operation : tx.Operations) {
const TString& consumer = operation.GetConsumer();

Expand Down
115 changes: 89 additions & 26 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,6 @@ void TPersQueue::EndWriteTabletState(const NKikimrClient::TResponse& resp, const

void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx)
{

auto& resp = ev->Get()->Record;

switch (resp.GetCookie()) {
Expand Down Expand Up @@ -2636,7 +2635,16 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
// - иначе
// - добавить сообщение в очередь для партиции
//
const TPartitionId& partitionId = TxWrites.at(writeId).Partitions.at(originalPartitionId);
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
if (writeInfo.TxId.Defined()) {
ReplyError(ctx,
responseCookie,
NPersQueue::NErrorCode::BAD_REQUEST,
"it is forbidden to write after a commit");
return;
}

const TPartitionId& partitionId = writeInfo.Partitions.at(originalPartitionId);
Y_ABORT_UNLESS(Partitions.contains(partitionId));
TPartitionInfo& partition = Partitions.at(partitionId);

Expand Down Expand Up @@ -3140,7 +3148,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
", is_write=" << isWriteOperation);
}

if ((TabletState != NKikimrPQ::ENormal) || txBody.HasWriteId()) {
if (TabletState != NKikimrPQ::ENormal) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
Expand All @@ -3151,19 +3159,32 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
//

if (txBody.GetImmediate()) {
//
// FIXME(abcdef): вместо Y_ABORT_UNLESS отправлять TEvProposeTransactionResult с кодом ошибки
//
Y_ABORT_UNLESS(txBody.OperationsSize() > 0);
if (txBody.OperationsSize() <= 0) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
}

TPartitionId partitionId =
MakePartitionId(txBody.GetOperations(0).GetPartitionId(),
txBody.HasWriteId() ? TMaybe<ui64>(txBody.GetWriteId()) : Nothing());
auto i = Partitions.find(partitionId);
Y_ABORT_UNLESS(i != Partitions.end());
TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
if (!partitionId.Defined()) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
}

if (!Partitions.contains(*partitionId)) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
}

ctx.Send(i->second.Actor, ev.Release());
if (txBody.GetImmediate() && !txBody.HasWriteId()) {
const TPartitionInfo& partition = Partitions.at(*partitionId);

ctx.Send(partition.Actor, ev.Release());
} else {
EvProposeTransactionQueue.emplace_back(ev.Release());

Expand Down Expand Up @@ -3444,6 +3465,14 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
case NKikimrPQ::TTransaction::UNKNOWN:
tx.OnProposeTransaction(event, GetAllowedStep(),
TabletID());

if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
writeInfo.TxId = tx.TxId;
}

CheckTxState(ctx, tx);
break;
case NKikimrPQ::TTransaction::PREPARING:
Expand Down Expand Up @@ -3690,20 +3719,35 @@ void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
}
}

TPartitionId TPersQueue::MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const
TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransaction& txBody) const
{
Y_ABORT_UNLESS(!writeId.Defined());
return TPartitionId{originalPartitionId};
ui32 partitionId = txBody.GetOperations(0).GetPartitionId();

if (txBody.HasWriteId()) {
ui64 writeId = txBody.GetWriteId();
if (!TxWrites.contains(writeId)) {
return Nothing();
}

const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
if (!writeInfo.Partitions.contains(partitionId)) {
return Nothing();
}

return writeInfo.Partitions.at(partitionId);
} else {
return TPartitionId(partitionId);
}
}

void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx)
{
THashMap<TPartitionId, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;
THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;

for (auto& operation : tx.Operations) {
TPartitionId partitionId = MakePartitionId(operation.GetPartitionId(), tx.WriteId);
auto& event = events[partitionId];
ui32 originalPartitionId = operation.GetPartitionId();
auto& event = events[originalPartitionId];
if (!event) {
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
}
Expand All @@ -3713,11 +3757,30 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
operation.GetEnd());
}

for (auto& [partition, event] : events) {
auto p = Partitions.find(partition);
Y_ABORT_UNLESS(p != Partitions.end());
if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);

ctx.Send(p->second.Actor, event.release());
for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
Y_ABORT_UNLESS(Partitions.contains(partitionId));
const TPartitionInfo& partition = Partitions.at(partitionId);

auto& event = events[originalPartitionId];
if (!event) {
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
}

event->SupportivePartitionActor = partition.Actor;
}
}

for (auto& [originalPartitionId, event] : events) {
TPartitionId partitionId(originalPartitionId);
Y_ABORT_UNLESS(Partitions.contains(partitionId));
const TPartitionInfo& partition = Partitions.at(partitionId);

ctx.Send(partition.Actor, event.release());
}

tx.PartitionRepliesCount = 0;
Expand All @@ -3732,7 +3795,7 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx,
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId);

auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId));
auto p = Partitions.find(TPartitionId(partitionId));
Y_ABORT_UNLESS(p != Partitions.end());

ctx.Send(p->second.Actor, event.release());
Expand All @@ -3750,7 +3813,7 @@ void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx,
for (ui32 partitionId : tx.Partitions) {
auto event = std::make_unique<TEvPQ::TEvTxRollback>(tx.Step, tx.TxId);

auto p = Partitions.find(MakePartitionId(partitionId, tx.WriteId));
auto p = Partitions.find(TPartitionId(partitionId));
Y_ABORT_UNLESS(p != Partitions.end());

ctx.Send(p->second.Actor, event.release());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;

TPartitionId MakePartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId) const;
TMaybe<TPartitionId> FindPartitionId(const NKikimrPQ::TDataTransaction& txBody) const;

void InitPlanStep(const NKikimrPQ::TTabletTxInfo& info = {});
void SavePlanStep(NKikimrPQ::TTabletTxInfo& info);
Expand Down
Loading

0 comments on commit def1b93

Please sign in to comment.