Skip to content

Commit

Permalink
immediate writing with no tx (#8697)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 4, 2024
1 parent 4c66728 commit b666aa2
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 27 deletions.
4 changes: 0 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme [*/*]*
ydb/core/kqp/ut/scheme KqpOlapScheme.DropThenAddColumn
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
operation->OnWriteFinish(txc, aggr->GetWriteIds());
if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(operation->GetLockId());
TString txBody;
Expand Down Expand Up @@ -145,11 +150,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock) {
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}

}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
Expand Down
31 changes: 16 additions & 15 deletions ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void TOperationsManager::CommitTransactionOnExecute(
opPtr->CommitOnExecute(owner, txc, snapshot);
commited.emplace_back(opPtr);
}
OnTransactionFinishOnExecute(commited, txId, txc);
OnTransactionFinishOnExecute(commited, lock, txId, txc);
}

void TOperationsManager::CommitTransactionOnComplete(
Expand All @@ -101,7 +101,7 @@ void TOperationsManager::CommitTransactionOnComplete(
opPtr->CommitOnComplete(owner, snapshot);
commited.emplace_back(opPtr);
}
OnTransactionFinishOnComplete(commited, txId);
OnTransactionFinishOnComplete(commited, lock, txId);
}

void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
Expand All @@ -118,7 +118,7 @@ void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui
aborted.emplace_back(opPtr);
}

OnTransactionFinishOnExecute(aborted, txId, txc);
OnTransactionFinishOnExecute(aborted, *lock, txId, txc);
}

void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const ui64 txId) {
Expand All @@ -135,7 +135,7 @@ void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const u
aborted.emplace_back(opPtr);
}

OnTransactionFinishOnComplete(aborted, txId);
OnTransactionFinishOnComplete(aborted, *lock, txId);
}

TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const {
Expand All @@ -147,24 +147,20 @@ TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) c
}

void TOperationsManager::OnTransactionFinishOnExecute(
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
const ui64 lockId = GetLockForTxVerified(txId);
auto itLock = LockFeatures.find(lockId);
AFL_VERIFY(itLock != LockFeatures.end());
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
for (auto&& op : operations) {
RemoveOperationOnExecute(op, txc);
}
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::OperationTxIds>().Key(txId, lockId).Delete();
db.Table<Schema::OperationTxIds>().Key(txId, lock.GetLockId()).Delete();
}

void TOperationsManager::OnTransactionFinishOnComplete(
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId) {
const ui64 lockId = GetLockForTxVerified(txId);
auto itLock = LockFeatures.find(lockId);
AFL_VERIFY(itLock != LockFeatures.end());
itLock->second.RemoveInteractions(InteractionsContext);
LockFeatures.erase(lockId);
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId) {
{
lock.RemoveInteractions(InteractionsContext);
LockFeatures.erase(lock.GetLockId());
}
Tx2Lock.erase(txId);
for (auto&& op : operations) {
RemoveOperationOnComplete(op);
Expand Down Expand Up @@ -233,6 +229,11 @@ EOperationBehaviour TOperationsManager::GetBehaviour(const NEvents::TDataEvents:
return EOperationBehaviour::Undefined;
}

if (!evWrite.Record.HasLockTxId() && !evWrite.Record.HasLockNodeId() &&
evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
return EOperationBehaviour::NoTxWrite;
}

if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
return EOperationBehaviour::InTxWrite;
}
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class TOperationsManager {
TWriteId LastWriteId = TWriteId(0);

public:

bool Load(NTabletFlatExecutor::TTransactionContext& txc);
void AddEventForTx(TColumnShard& owner, const ui64 txId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
void AddEventForLock(TColumnShard& owner, const ui64 lockId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
Expand All @@ -139,6 +140,9 @@ class TOperationsManager {
TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot);
void CommitTransactionOnComplete(
TColumnShard& owner, const ui64 txId, const NOlap::TSnapshot& snapshot);
void AddTemporaryTxLink(const ui64 lockId) {
AFL_VERIFY(Tx2Lock.emplace(lockId, lockId).second);
}
void LinkTransactionOnExecute(const ui64 lockId, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void LinkTransactionOnComplete(const ui64 lockId, const ui64 txId);
void AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
Expand Down Expand Up @@ -198,7 +202,8 @@ class TOperationsManager {
TWriteId BuildNextWriteId();
void RemoveOperationOnExecute(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc);
void RemoveOperationOnComplete(const TWriteOperation::TPtr& op);
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId);
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId,
NTabletFlatExecutor::TTransactionContext& txc);
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId);
};
} // namespace NKikimr::NColumnShard
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/operations/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ void TWriteOperation::CommitOnComplete(TColumnShard& owner, const NOlap::TSnapsh
owner.UpdateInsertTableCounters();
}

void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds) {
void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag) {
Y_ABORT_UNLESS(Status == EOperationStatus::Started);
Status = EOperationStatus::Prepared;
GlobalWriteIds = globalWriteIds;

if (ephemeralFlag) {
return;
}

NIceDb::TNiceDb db(txc.DB);
NKikimrTxColumnShard::TInternalOperationData proto;
ToProto(proto);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/operations/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ enum class EOperationBehaviour : ui32 {
InTxWrite = 2,
WriteWithLock = 3,
CommitWriteLock = 4,
AbortWriteLock = 5
AbortWriteLock = 5,
NoTxWrite = 6
};

class TWriteOperation {
Expand All @@ -61,7 +62,7 @@ class TWriteOperation {

void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source,
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx);
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds);
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag);
void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const;
void AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/data_events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ struct TDataEvents {
return result;
}

static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin) {
auto result = std::make_unique<TEvWriteResult>();
result->Record.SetOrigin(origin);
result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
return result;
}

static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin, const ui64 txId) {
auto result = std::make_unique<TEvWriteResult>();
result->Record.SetOrigin(origin);
Expand Down

0 comments on commit b666aa2

Please sign in to comment.