Skip to content

Commit

Permalink
remove validation throught available reask in timeout (#8609)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 2, 2024
1 parent 892228d commit b8d1289
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,12 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId
return true;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_prepared_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
return false;
}
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_removed_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
return true;
}
return false;
}

void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) {
Expand Down
24 changes: 15 additions & 9 deletions ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,25 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
auto copy = *op;
AFL_VERIFY(copy.WaitShardsBrokenFlags.erase(TabletId))("remove_tablet_id", TabletId);
copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
if (copy.WaitShardsBrokenFlags.erase(TabletId)) {
copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId);
}
return true;
}
virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
AFL_VERIFY(op->WaitShardsBrokenFlags.erase(TabletId))("remove_tablet_id", TabletId);
op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag;
op->SendBrokenFlagAck(*Self, TabletId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
"receive", TabletId);
op->InitializeRequests(*Self);
if (op->WaitShardsBrokenFlags.erase(TabletId)) {
op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag;
op->SendBrokenFlagAck(*Self, TabletId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
"receive", TabletId);
op->InitializeRequests(*Self);
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId);
}
}

public:
Expand Down

0 comments on commit b8d1289

Please sign in to comment.