diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h index 5ca66fe90a34..96d8f09e5e6d 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h @@ -5,10 +5,10 @@ namespace NKikimr::NColumnShard { class TTxInsertTableCleanup: public TTransactionBase { private: - THashSet WriteIdsToAbort; + THashSet WriteIdsToAbort; std::shared_ptr BlobsAction; public: - TTxInsertTableCleanup(TColumnShard* self, THashSet&& writeIdsToAbort) + TTxInsertTableCleanup(TColumnShard* self, THashSet&& writeIdsToAbort) : TBase(self) , WriteIdsToAbort(std::move(writeIdsToAbort)) { Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size()); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 0de9017a8516..dd2d4f6397f3 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -1,9 +1,11 @@ #include "tx_write.h" + +#include #include namespace NKikimr::NColumnShard { -bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) { +bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) { NKikimrTxColumnShard::TLogicalMetadata meta; meta.SetNumRows(batch->GetRowsCount()); meta.SetRawBytes(batch->GetRawBytes()); @@ -23,9 +25,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali auto schemeVersion = batch.GetAggregation().GetSchemaVersion(); auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, - meta, tableSchema->GetVersion(), - batch->GetData()); + auto userData = std::make_shared(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData()); + NOlap::TInsertedData insertData(writeId, userData); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { Self->UpdateInsertTableCounters(); @@ -36,7 +37,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { TMemoryProfileGuard mpg("TTxWrite::Execute"); - NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); + NActors::TLogContextGuard logGuard = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); for (auto&& aggr : buffer.GetAggregations()) { @@ -45,33 +47,27 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { + NIceDb::TNiceDb db(txc.DB); + const TInsertWriteId insertWriteId = + Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion()); + aggr->AddInsertWriteId(insertWriteId); if (writeMeta.IsGuaranteeWriter()) { AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size()); } else { AFL_VERIFY(aggr->GetSplittedBlobs().size() <= 1)("count", aggr->GetSplittedBlobs().size()); } + if (aggr->GetSplittedBlobs().size() == 1) { + AFL_VERIFY(InsertOneBlob(txc, aggr->GetSplittedBlobs().front(), insertWriteId))("write_id", writeMeta.GetWriteId())( + "insert_write_id", insertWriteId); + } } else { - operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - Y_ABORT_UNLESS(operation); + operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); - } - - auto writeId = TWriteId(writeMeta.GetWriteId()); - if (!operation) { - NIceDb::TNiceDb db(txc.DB); - writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion()); - aggr->AddWriteId(writeId); - } - - for (auto&& i : aggr->GetSplittedBlobs()) { - if (operation) { - writeId = Self->BuildNextWriteId(txc); - aggr->AddWriteId(writeId); - } - - if (!InsertOneBlob(txc, i, writeId)) { - LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); - Self->Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE); + for (auto&& i : aggr->GetSplittedBlobs()) { + const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc); + aggr->AddInsertWriteId(insertWriteId); + AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)( + "size", aggr->GetSplittedBlobs().size()); } } } @@ -88,9 +84,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteMeta(); if (!writeMeta.HasLongTxId()) { - auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId()); + auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); - operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite); + operation->OnWriteFinish(txc, aggr->GetInsertWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite); if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) { auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID()); Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie()); @@ -119,8 +115,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie()); } } else { - Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1); - auto ev = std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + Y_ABORT_UNLESS(aggr->GetInsertWriteIds().size() == 1); + auto ev = std::make_unique( + Self->TabletID(), writeMeta, (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0); } } @@ -129,7 +126,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { void TTxWrite::Complete(const TActorContext& ctx) { TMemoryProfileGuard mpg("TTxWrite::Complete"); - NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete"); + NActors::TLogContextGuard logGuard = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete"); const auto now = TMonotonic::Now(); const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); for (auto&& i : buffer.GetAddActions()) { @@ -149,7 +147,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta(); if (!writeMeta.HasLongTxId()) { - auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId())); + auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) { auto evWrite = std::make_shared(writeMeta.GetTableId(), buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey()); @@ -158,7 +156,6 @@ void TTxWrite::Complete(const TActorContext& ctx) { if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) { Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot()); } - } Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); Self->Counters.GetCSCounters().OnSuccessWriteResponse(); @@ -166,4 +163,4 @@ void TTxWrite::Complete(const TActorContext& ctx) { Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); } -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index 98de301e5e16..84ffbe7a9005 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -43,7 +43,7 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase { std::vector> ResultOperators; - bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); + bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId); TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index ca0785756ef9..f3a6b9e99db9 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -301,7 +301,7 @@ void TColumnShard::UpdateIndexCounters() { ui64 TColumnShard::MemoryUsage() const { ui64 memory = ProgressTxController->GetMemoryUsage() + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) + - LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + + LongTxWrites.size() * (sizeof(TInsertWriteId) + sizeof(TLongTxWriteInfo)) + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + (WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_RECORDS) * sizeof(NOlap::TInsertedData) + diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 130fd7960628..09cf1f4ef71f 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -41,7 +41,6 @@ void TTxInit::SetDefaults() { Self->CurrentSchemeShardId = 0; Self->LastSchemaSeqNo = { }; Self->ProcessingParams.reset(); - Self->LastWriteId = TWriteId{0}; Self->LastPlannedStep = 0; Self->LastPlannedTxId = 0; Self->LastCompletedTx = NOlap::TSnapshot::Zero(); @@ -73,7 +72,6 @@ bool TTxInit::Precharge(TTransactionContext& txc) { ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoGeneration, Self->LastSchemaSeqNo.Generation); ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoRound, Self->LastSchemaSeqNo.Round); ready = ready && Schema::GetSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, Self->ProcessingParams); - ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastWriteId, Self->LastWriteId); ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep); ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId); ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); @@ -107,7 +105,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) ACFL_DEBUG("step", "TInsertTable::Load_Start"); TMemoryProfileGuard g("TTxInit/InsertTable"); auto localInsertTable = std::make_unique(); - if (!localInsertTable->Load(dbTable, TAppData::TimeProvider->Now())) { + if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) { ACFL_ERROR("step", "TInsertTable::Load_Fails"); return false; } @@ -182,7 +180,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } while (!rowset.EndOfSet()) { - const TWriteId writeId = TWriteId{ rowset.GetValue() }; + const TInsertWriteId writeId = (TInsertWriteId)rowset.GetValue(); const ui32 writePartId = rowset.GetValue(); NKikimrLongTxService::TLongTxId proto; Y_ABORT_UNLESS(proto.ParseFromString(rowset.GetValue())); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 984d85798f9d..f1b298ac81e6 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto result = std::make_unique(TabletID(), writeMeta, errCode); ctx.Send(writeMeta.GetSource(), result.release()); } else { - auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + auto operation = OperationsManager->GetOperation((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(), ev->Get()->GetWriteResultStatus(), ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails"); @@ -324,7 +324,8 @@ class TCommitOperation { return TConclusionStatus::Success(); } - std::unique_ptr CreateTxOperator(const NKikimrTxColumnShard::ETransactionKind kind) const { + std::unique_ptr CreateTxOperator( + const NKikimrTxColumnShard::ETransactionKind kind) const { AFL_VERIFY(ReceivingShards.size()); if (IsPrimary()) { return std::make_unique( @@ -429,7 +430,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto source = ev->Sender; const auto cookie = ev->Cookie; const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get()); -// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString()); + // AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString()); if (behaviour == EOperationBehaviour::Undefined) { Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError( @@ -447,8 +448,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto commitOperation = std::make_shared(TabletID()); const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) { Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); - auto result = - NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message); ctx.Send(source, result.release(), 0, cookie); }; auto conclusionParse = commitOperation->Parse(*ev->Get()); @@ -466,7 +466,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor " != " + ::ToString(commitOperation->GetGeneration()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN); } else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) { - sendError("tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) + + sendError( + "tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) + " != " + ::ToString(commitOperation->GetInternalGenerationCounter()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN); } else { diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp deleted file mode 100644 index d9f0dcc5e79b..000000000000 --- a/ydb/core/tx/columnshard/columnshard_common.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "columnshard_common.h" -#include - -namespace NKikimr::NColumnShard { - -namespace { - -using EOperation = NArrow::EOperation; -using EAggregate = NArrow::EAggregate; -using TAssign = NSsa::TAssign; -using TAggregateAssign = NSsa::TAggregateAssign; - -} - -using EOperation = NArrow::EOperation; -using TPredicate = NOlap::TPredicate; - -} diff --git a/ydb/core/tx/columnshard/columnshard_common.h b/ydb/core/tx/columnshard/columnshard_common.h deleted file mode 100644 index 455f39a512cc..000000000000 --- a/ydb/core/tx/columnshard/columnshard_common.h +++ /dev/null @@ -1,94 +0,0 @@ -#pragma once -#include "engines/reader/common/description.h" -#include "engines/predicate/predicate.h" - -#include - -namespace NKikimr::NOlap { - struct TIndexInfo; -} - -namespace NKikimr::NColumnShard { - -using TReadDescription = NOlap::NReader::TReadDescription; -using IColumnResolver = NOlap::IColumnResolver; -using NOlap::TWriteId; - -class TBatchCache { -public: - using TUnifiedBlobId = NOlap::TUnifiedBlobId; - using TInsertedBatch = std::pair>; - - static constexpr ui32 MAX_COMMITTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT; - static constexpr ui32 MAX_INSERTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT; - static constexpr ui64 MAX_TOTAL_SIZE = 2 * TLimits::MIN_BYTES_TO_INSERT; - - TBatchCache() - : Inserted(MAX_INSERTED_COUNT) - , Committed(MAX_COMMITTED_COUNT) - {} - - void Insert(TWriteId writeId, const TUnifiedBlobId& blobId, const std::shared_ptr& batch) { - if (Bytes() + blobId.BlobSize() > MAX_TOTAL_SIZE) { - return; - } - InsertedBytes += blobId.BlobSize(); - Inserted.Insert(writeId, {blobId, batch}); - } - - void Commit(TWriteId writeId) { - auto it = Inserted.FindWithoutPromote(writeId); - if (it != Inserted.End()) { - auto& blobId = it->first; - InsertedBytes -= blobId.BlobSize(); - CommittedBytes += blobId.BlobSize(); - - Committed.Insert(blobId, it->second); - Inserted.Erase(it); - } - } - - void EraseInserted(TWriteId writeId) { - auto it = Inserted.FindWithoutPromote(writeId); - if (it != Inserted.End()) { - InsertedBytes -= (*it).first.BlobSize(); - Inserted.Erase(it); - } - } - - void EraseCommitted(const TUnifiedBlobId& blobId) { - auto it = Committed.FindWithoutPromote(blobId); - if (it != Committed.End()) { - CommittedBytes -= blobId.BlobSize(); - Committed.Erase(it); - } - } - - TInsertedBatch GetInserted(TWriteId writeId) const { - auto it = Inserted.Find(writeId); - if (it != Inserted.End()) { - return *it; - } - return {}; - } - - std::shared_ptr Get(const TUnifiedBlobId& blobId) const { - auto it = Committed.Find(blobId); - if (it != Committed.End()) { - return *it; - } - return {}; - } - - ui64 Bytes() const { - return InsertedBytes + CommittedBytes; - } - -private: - mutable TLRUCache Inserted; - mutable TLRUCache> Committed; - ui64 InsertedBytes{0}; - ui64 CommittedBytes{0}; -}; - -} diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 7eab4998c03c..4c988c5fd761 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -201,66 +201,56 @@ NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const { return NOlap::TSnapshot::MaxForPlanStep(minReadStep); } -TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const { +TInsertWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const { auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); if (it != LongTxWritesByUniqueId.end()) { auto itPart = it->second.find(partId); if (itPart != it->second.end()) { - return (TWriteId)itPart->second->WriteId; + return itPart->second->InsertWriteId; } } - return (TWriteId)0; + return (TInsertWriteId)0; } -TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional granuleShardingVersionId) { +TInsertWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional granuleShardingVersionId) { auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); if (it != LongTxWritesByUniqueId.end()) { auto itPart = it->second.find(partId); if (itPart != it->second.end()) { - return (TWriteId)itPart->second->WriteId; + return itPart->second->InsertWriteId; } } else { it = LongTxWritesByUniqueId.emplace(longTxId.UniqueId, TPartsForLTXShard()).first; } - TWriteId writeId = BuildNextWriteId(db); - auto& lw = LongTxWrites[writeId]; - lw.WriteId = (ui64)writeId; + TInsertWriteId insertWriteId = InsertTable->BuildNextWriteId(db); + auto& lw = LongTxWrites[insertWriteId]; + lw.InsertWriteId = insertWriteId; lw.WritePartId = partId; lw.LongTxId = longTxId; lw.GranuleShardingVersionId = granuleShardingVersionId; it->second[partId] = &lw; - Schema::SaveLongTxWrite(db, writeId, partId, longTxId, granuleShardingVersionId); - return writeId; -} - -TWriteId TColumnShard::BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc) { - NIceDb::TNiceDb db(txc.DB); - return BuildNextWriteId(db); -} - -TWriteId TColumnShard::BuildNextWriteId(NIceDb::TNiceDb& db) { - TWriteId writeId = ++LastWriteId; - Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId); - return writeId; + Schema::SaveLongTxWrite(db, insertWriteId, partId, longTxId, granuleShardingVersionId); + return insertWriteId; } -void TColumnShard::AddLongTxWrite(TWriteId writeId, ui64 txId) { - auto& lw = LongTxWrites.at(writeId); - lw.PreparedTxId = txId; +void TColumnShard::AddLongTxWrite(const TInsertWriteId writeId, ui64 txId) { + auto it = LongTxWrites.find(writeId); + AFL_VERIFY(it != LongTxWrites.end()); + it->second.PreparedTxId = txId; } -void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion) { +void TColumnShard::LoadLongTxWrite(const TInsertWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion) { auto& lw = LongTxWrites[writeId]; lw.WritePartId = writePartId; - lw.WriteId = (ui64)writeId; + lw.InsertWriteId = writeId; lw.LongTxId = longTxId; lw.GranuleShardingVersionId = granuleShardingVersion; LongTxWritesByUniqueId[longTxId.UniqueId][writePartId] = &lw; } -bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId, const ui64 txId) { +bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TInsertWriteId writeId, const ui64 txId) { if (auto* lw = LongTxWrites.FindPtr(writeId)) { ui64 prepared = lw->PreparedTxId; if (!prepared || txId == prepared) { @@ -282,8 +272,8 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId } } -void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort) { - std::vector failedAborts; +void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort) { + std::vector failedAborts; for (auto& writeId : writesToAbort) { if (!RemoveLongTxWrite(db, writeId, 0)) { failedAborts.push_back(writeId); @@ -631,14 +621,14 @@ class TTTLChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCou using TBase::TBase; }; -void TColumnShard::StartIndexTask(std::vector&& dataToIndex, const i64 bytesToIndex) { +void TColumnShard::StartIndexTask(std::vector&& dataToIndex, const i64 bytesToIndex) { Counters.GetCSCounters().IndexationInput(bytesToIndex); - std::vector data; + std::vector data; data.reserve(dataToIndex.size()); for (auto& ptr : dataToIndex) { data.push_back(*ptr); - if (!TablesManager.HasTable(data.back().PathId)) { + if (!TablesManager.HasTable(data.back().GetPathId())) { data.back().SetRemove(); } } @@ -691,7 +681,7 @@ void TColumnShard::SetupIndexation() { Counters.GetCSCounters().OnSetupIndexation(); ui64 bytesToIndex = 0; ui64 txBytesWrite = 0; - std::vector dataToIndex; + std::vector dataToIndex; dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) { for (auto* pathInfo : it->second) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 8208385ea9b2..950c6c8655c7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -3,7 +3,6 @@ #include "background_controller.h" #include "counters.h" #include "columnshard.h" -#include "columnshard_common.h" #include "columnshard_ttl.h" #include "columnshard_private_events.h" #include "tables_manager.h" @@ -425,7 +424,7 @@ class TColumnShard std::optional StartInstant; struct TLongTxWriteInfo { - ui64 WriteId; + TInsertWriteId InsertWriteId; ui32 WritePartId; NLongTxService::TLongTxId LongTxId; ui64 PreparedTxId = 0; @@ -435,7 +434,6 @@ class TColumnShard ui64 CurrentSchemeShardId = 0; TMessageSeqNo LastSchemaSeqNo; std::optional ProcessingParams; - TWriteId LastWriteId = TWriteId{0}; ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; NOlap::TSnapshot LastCompletedTx = NOlap::TSnapshot::Zero(); @@ -470,7 +468,7 @@ class TColumnShard std::optional ProgressTxInFlight; THashMap ScanTxInFlight; - THashMap LongTxWrites; + THashMap LongTxWrites; using TPartsForLTXShard = THashMap; THashMap LongTxWritesByUniqueId; TMultiMap WaitingScans; @@ -483,7 +481,7 @@ class TColumnShard void TryRegisterMediatorTimeCast(); void UnregisterMediatorTimeCast(); - void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort); + void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort); bool WaitPlanStep(ui64 step); void SendWaitPlanStep(ui64 step); @@ -496,14 +494,11 @@ class TColumnShard return ProgressTxController->GetTxCompleteLag(mediatorTime); } - TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const; - TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional granuleShardingVersionId); - void AddLongTxWrite(TWriteId writeId, ui64 txId); - void LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion); - bool RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId, const ui64 txId); - - TWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); - TWriteId BuildNextWriteId(NIceDb::TNiceDb& db); + TInsertWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const; + TInsertWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional granuleShardingVersionId); + void AddLongTxWrite(const TInsertWriteId writeId, ui64 txId); + void LoadLongTxWrite(const TInsertWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion); + bool RemoveLongTxWrite(NIceDb::TNiceDb& db, const TInsertWriteId writeId, const ui64 txId); void EnqueueBackgroundActivities(const bool periodic = false); virtual void Enqueue(STFUNC_SIG) override; @@ -518,7 +513,7 @@ class TColumnShard void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); - void StartIndexTask(std::vector&& dataToIndex, const i64 bytesToIndex); + void StartIndexTask(std::vector&& dataToIndex, const i64 bytesToIndex); void SetupIndexation(); void SetupCompaction(); bool SetupTtl(const THashMap& pathTtls = {}); diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 359e4adbfc2b..eefcfca8f6cc 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -4,27 +4,25 @@ namespace NKikimr::NColumnShard { bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) { - auto rowset = db.Table().GreaterOrEqual(0, 0, 0, 0, "").Select(); + auto rowset = db.Table().Select(); if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { EInsertTableIds recType = (EInsertTableIds)rowset.GetValue(); - ui64 planStep = rowset.GetValue(); - ui64 writeTxId = rowset.GetValueOrDefault(); - ui64 pathId = rowset.GetValue(); - TString dedupId = rowset.GetValue(); - TString strBlobId = rowset.GetValue(); - TString metaStr = rowset.GetValue(); - ui64 schemaVersion = rowset.HaveValue() ? rowset.GetValue() : 0; + const ui64 planStep = rowset.GetValue(); + const ui64 writeTxId = rowset.GetValueOrDefault(); + const ui64 pathId = rowset.GetValue(); + const TString dedupId = rowset.GetValue(); + const ui64 schemaVersion = rowset.HaveValue() ? rowset.GetValue() : 0; TString error; - NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); + NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(rowset.GetValue(), dsGroupSelector, error); Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); NKikimrTxColumnShard::TLogicalMetadata meta; - if (metaStr) { + if (auto metaStr = rowset.GetValue()) { Y_ABORT_UNLESS(meta.ParseFromString(metaStr)); } @@ -36,19 +34,20 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (rowset.HaveValue()) { rangeSize = rowset.GetValue(); } - AFL_VERIFY(!!rangeOffset == !!rangeSize); - TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, {}); + + auto userData = std::make_shared(pathId, + NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, std::nullopt); switch (recType) { case EInsertTableIds::Inserted: - insertTable.AddInserted(std::move(data), true); + insertTable.AddInserted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true); break; case EInsertTableIds::Committed: - insertTable.AddCommitted(std::move(data), true); + insertTable.AddCommitted(NOlap::TCommittedData(userData, planStep, writeTxId, dedupId), true); break; case EInsertTableIds::Aborted: - insertTable.AddAborted(std::move(data), true); + insertTable.AddAborted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true); break; } if (!rowset.Next()) { diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 5fb0122b203a..d9f0b58a0415 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -17,7 +17,7 @@ class TColumnChunkLoadContext; namespace NKikimr::NColumnShard { -using NOlap::TWriteId; +using NOlap::TInsertWriteId; using NOlap::IBlobGroupSelector; struct TFullTxInfo; @@ -31,6 +31,7 @@ struct Schema : NIceDb::Schema { using TSettings = SchemaSettings; using TInsertedData = NOlap::TInsertedData; + using TCommittedData = NOlap::TCommittedData; using TColumnRecord = NOlap::TColumnRecord; enum EIndexTables : ui32 { @@ -784,7 +785,7 @@ struct Schema : NIceDb::Schema { db.Table().Key(pathId).Delete(); } - static void SaveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion) { + static void SaveLongTxWrite(NIceDb::TNiceDb& db, const TInsertWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId, const std::optional granuleShardingVersion) { NKikimrLongTxService::TLongTxId proto; longTxId.ToProto(&proto); TString serialized; @@ -796,32 +797,49 @@ struct Schema : NIceDb::Schema { ); } - static void EraseLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId) { + static void EraseLongTxWrite(NIceDb::TNiceDb& db, const TInsertWriteId writeId) { db.Table().Key((ui64)writeId).Delete(); } // InsertTable activities - static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { - db.Table().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update( - NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), - NIceDb::TUpdate(data.GetBlobRange().Offset), - NIceDb::TUpdate(data.GetBlobRange().Size), - NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), - NIceDb::TUpdate(data.GetSchemaVersion()) - ); + static void InsertTable_Upsert(NIceDb::TNiceDb& db, const EInsertTableIds recType, const TInsertedData& data) { + db.Table() + .Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "") + .Update(NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate(data.GetBlobRange().Offset), + NIceDb::TUpdate(data.GetBlobRange().Size), + NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), + NIceDb::TUpdate(data.GetSchemaVersion())); + } + + static void InsertTable_Upsert(NIceDb::TNiceDb& db, const TCommittedData& data) { + db.Table() + .Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), + data.GetDedupId()) + .Update(NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate(data.GetBlobRange().Offset), + NIceDb::TUpdate(data.GetBlobRange().Size), + NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), + NIceDb::TUpdate(data.GetSchemaVersion())); } static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { - db.Table().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Delete(); + db.Table().Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "").Delete(); + } + + static void InsertTable_Erase(NIceDb::TNiceDb& db, const TCommittedData& data) { + db.Table() + .Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), data.GetDedupId()) + .Delete(); } static void InsertTable_Insert(NIceDb::TNiceDb& db, const TInsertedData& data) { InsertTable_Upsert(db, EInsertTableIds::Inserted, data); } - static void InsertTable_Commit(NIceDb::TNiceDb& db, const TInsertedData& data) { - InsertTable_Upsert(db, EInsertTableIds::Committed, data); + static void InsertTable_Commit(NIceDb::TNiceDb& db, const TCommittedData& data) { + InsertTable_Upsert(db, data); } static void InsertTable_Abort(NIceDb::TNiceDb& db, const TInsertedData& data) { @@ -832,8 +850,8 @@ struct Schema : NIceDb::Schema { InsertTable_Erase(db, EInsertTableIds::Inserted, data); } - static void InsertTable_EraseCommitted(NIceDb::TNiceDb& db, const TInsertedData& data) { - InsertTable_Erase(db, EInsertTableIds::Committed, data); + static void InsertTable_EraseCommitted(NIceDb::TNiceDb& db, const TCommittedData& data) { + InsertTable_Erase(db, data); } static void InsertTable_EraseAborted(NIceDb::TNiceDb& db, const TInsertedData& data) { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 077a486f6837..22ca7fd2c738 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -98,7 +98,7 @@ class TPathFieldsInfo { return UsageColumnIds; } - void AddChunkInfo(const TInsertedData& data, const TConstructionContext& context) { + void AddChunkInfo(const TCommittedData& data, const TConstructionContext& context) { AFL_VERIFY(!Finished); if (UsageColumnIds.size() == FullColumnsCount) { return; @@ -143,7 +143,7 @@ class TPathData { return result; } - void AddChunkInfo(const NOlap::TInsertedData& data, const TConstructionContext& context) { + void AddChunkInfo(const NOlap::TCommittedData& data, const TConstructionContext& context) { ColumnsInfo.AddChunkInfo(data, context); } @@ -151,7 +151,7 @@ class TPathData { return ColumnsInfo.HasDeletion(); } - void AddBatch(const NOlap::TInsertedData& data, const std::shared_ptr& batch) { + void AddBatch(const NOlap::TCommittedData& data, const std::shared_ptr& batch) { AFL_VERIFY(ColumnsInfo.IsFinished()); AFL_VERIFY(batch); Batches.emplace_back(batch, data.GetMeta().GetModificationType()); @@ -186,18 +186,18 @@ class TPathesData { return Data; } - void AddChunkInfo(const NOlap::TInsertedData& inserted, const TConstructionContext& context) { - auto shardingFilterCommit = context.SchemaVersions.GetShardingInfoOptional(inserted.PathId, inserted.GetSnapshot()); - auto it = Data.find(inserted.PathId); + void AddChunkInfo(const NOlap::TCommittedData& inserted, const TConstructionContext& context) { + auto shardingFilterCommit = context.SchemaVersions.GetShardingInfoOptional(inserted.GetPathId(), inserted.GetSnapshot()); + auto it = Data.find(inserted.GetPathId()); if (it == Data.end()) { - it = Data.emplace(inserted.PathId, TPathData(shardingFilterCommit, ResultSchema)).first; + it = Data.emplace(inserted.GetPathId(), TPathData(shardingFilterCommit, ResultSchema)).first; } it->second.AddChunkInfo(inserted, context); it->second.AddShardingInfo(shardingFilterCommit); } - void AddBatch(const NOlap::TInsertedData& inserted, const std::shared_ptr& batch) { - auto it = Data.find(inserted.PathId); + void AddBatch(const NOlap::TCommittedData& inserted, const std::shared_ptr& batch) { + auto it = Data.find(inserted.GetPathId()); AFL_VERIFY(it != Data.end()); it->second.AddBatch(inserted, batch); } @@ -246,7 +246,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont } IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot()); - auto& pathInfo = pathBatches.GetPathInfo(inserted.PathId); + auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId()); if (pathInfo.HasDeletion()) { IIndexInfo::AddDeleteFlagsColumn(*batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index a93bfd4dec6c..4c7f8602a6f5 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -1,16 +1,21 @@ #pragma once -#include "abstract/abstract.h" #include "with_appended.h" -#include -#include + +#include "abstract/abstract.h" + #include +#include +#include + +#include namespace NKikimr::NOlap { class TInsertColumnEngineChanges: public TChangesWithAppend { private: using TBase = TChangesWithAppend; - std::vector DataToIndex; + std::vector DataToIndex; + protected: virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; @@ -34,13 +39,12 @@ class TInsertColumnEngineChanges: public TChangesWithAppend { public: THashMap PathToGranule; // pathId -> positions (sorted by pk) public: - TInsertColumnEngineChanges(std::vector&& dataToIndex, const TSaverContext& saverContext) + TInsertColumnEngineChanges(std::vector&& dataToIndex, const TSaverContext& saverContext) : TBase(saverContext, NBlobOperations::EConsumer::INDEXATION) - , DataToIndex(std::move(dataToIndex)) - { + , DataToIndex(std::move(dataToIndex)) { } - const std::vector& GetDataToIndex() const { + const std::vector& GetDataToIndex() const { return DataToIndex; } @@ -52,7 +56,6 @@ class TInsertColumnEngineChanges: public TChangesWithAppend { return StaticTypeName(); } std::optional AddPathIfNotExists(ui64 pathId); - }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 401bc202d3bb..2c616c06e32d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -280,7 +280,7 @@ class IColumnEngine { } virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0; virtual std::shared_ptr Select(ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter) const = 0; - virtual std::shared_ptr StartInsert(std::vector&& dataToIndex) noexcept = 0; + virtual std::shared_ptr StartInsert(std::vector&& dataToIndex) noexcept = 0; virtual std::shared_ptr StartCompaction(const std::shared_ptr& dataLocksManager) noexcept = 0; virtual std::shared_ptr StartCleanupPortions(const TSnapshot& snapshot, const THashSet& pathsToDrop, const std::shared_ptr& dataLocksManager) noexcept = 0; virtual std::shared_ptr StartCleanupTables(const THashSet& pathsToDrop) noexcept = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index af54ca1d248e..78ee4bea4ae4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -271,7 +271,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { return db.LoadCounters(callback); } -std::shared_ptr TColumnEngineForLogs::StartInsert(std::vector&& dataToIndex) noexcept { +std::shared_ptr TColumnEngineForLogs::StartInsert(std::vector&& dataToIndex) noexcept { Y_ABORT_UNLESS(dataToIndex.size()); TSaverContext saverContext(StoragesManager); @@ -279,7 +279,7 @@ std::shared_ptr TColumnEngineForLogs::StartInsert(st auto pkSchema = VersionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey(); for (const auto& data : changes->GetDataToIndex()) { - const ui64 pathId = data.PathId; + const ui64 pathId = data.GetPathId(); if (changes->PathToGranule.contains(pathId)) { continue; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 92a2ce63a71f..7b515c26f40c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -109,7 +109,7 @@ class TColumnEngineForLogs : public IColumnEngine { return limit < TGranulesStat::GetSumMetadataMemoryPortionsSize(); } - std::shared_ptr StartInsert(std::vector&& dataToIndex) noexcept override; + std::shared_ptr StartInsert(std::vector&& dataToIndex) noexcept override; std::shared_ptr StartCompaction(const std::shared_ptr& dataLocksManager) noexcept override; std::shared_ptr StartCleanupPortions(const TSnapshot& snapshot, const THashSet& pathsToDrop, const std::shared_ptr& dataLocksManager) noexcept override; std::shared_ptr StartCleanupTables(const THashSet& pathsToDrop) noexcept override; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index d38cdc53c1ae..b5c8e5e4ea58 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -12,7 +12,7 @@ void TDbWrapper::Insert(const TInsertedData& data) { NColumnShard::Schema::InsertTable_Insert(db, data); } -void TDbWrapper::Commit(const TInsertedData& data) { +void TDbWrapper::Commit(const TCommittedData& data) { NIceDb::TNiceDb db(Database); NColumnShard::Schema::InsertTable_Commit(db, data); } @@ -27,7 +27,7 @@ void TDbWrapper::EraseInserted(const TInsertedData& data) { NColumnShard::Schema::InsertTable_EraseInserted(db, data); } -void TDbWrapper::EraseCommitted(const TInsertedData& data) { +void TDbWrapper::EraseCommitted(const TCommittedData& data) { NIceDb::TNiceDb db(Database); NColumnShard::Schema::InsertTable_EraseCommitted(db, data); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 39536cb9c987..50958b6fca29 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -16,7 +16,8 @@ namespace NKikimr::NOlap { class TColumnChunkLoadContext; class TIndexChunkLoadContext; -struct TInsertedData; +class TInsertedData; +class TCommittedData; class TInsertTableAccessor; class TColumnRecord; class TIndexChunk; @@ -30,10 +31,10 @@ class IDbWrapper { virtual ~IDbWrapper() = default; virtual void Insert(const TInsertedData& data) = 0; - virtual void Commit(const TInsertedData& data) = 0; + virtual void Commit(const TCommittedData& data) = 0; virtual void Abort(const TInsertedData& data) = 0; virtual void EraseInserted(const TInsertedData& data) = 0; - virtual void EraseCommitted(const TInsertedData& data) = 0; + virtual void EraseCommitted(const TCommittedData& data) = 0; virtual void EraseAborted(const TInsertedData& data) = 0; virtual bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) = 0; @@ -63,10 +64,10 @@ class TDbWrapper : public IDbWrapper { {} void Insert(const TInsertedData& data) override; - void Commit(const TInsertedData& data) override; + void Commit(const TCommittedData& data) override; void Abort(const TInsertedData& data) override; void EraseInserted(const TInsertedData& data) override; - void EraseCommitted(const TInsertedData& data) override; + void EraseCommitted(const TCommittedData& data) override; void EraseAborted(const TInsertedData& data) override; bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; diff --git a/ydb/core/tx/columnshard/engines/defs.cpp b/ydb/core/tx/columnshard/engines/defs.cpp new file mode 100644 index 000000000000..2c50c99d1d3c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/defs.cpp @@ -0,0 +1,11 @@ +#include "defs.h" + +template <> +void Out(IOutputStream& os, TTypeTraits::TFuncParam val) { + os << (ui64)val; +} + +template <> +void Out(IOutputStream& os, TTypeTraits::TFuncParam val) { + os << (ui64)val; +} diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h index a01edc7ef767..efe1f1c744a8 100644 --- a/ydb/core/tx/columnshard/engines/defs.h +++ b/ydb/core/tx/columnshard/engines/defs.h @@ -8,18 +8,33 @@ namespace NKikimr::NOlap { using TLogThis = TCtorLogger; -enum class TWriteId : ui64 {}; +enum class TOperationWriteId : ui64 { +}; +enum class TInsertWriteId : ui64 { +}; + +inline TOperationWriteId operator++(TOperationWriteId& w) noexcept { + w = TOperationWriteId{ ui64(w) + 1 }; + return w; +} -inline TWriteId operator++(TWriteId& w) noexcept { - w = TWriteId{ui64(w) + 1}; +inline TInsertWriteId operator++(TInsertWriteId& w) noexcept { + w = TInsertWriteId{ ui64(w) + 1 }; return w; } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap + +template <> +struct THash { + inline size_t operator()(const NKikimr::NOlap::TInsertWriteId x) const noexcept { + return THash()(ui64(x)); + } +}; template <> -struct THash { - inline size_t operator()(const NKikimr::NOlap::TWriteId x) const noexcept { +struct THash { + inline size_t operator()(const NKikimr::NOlap::TOperationWriteId x) const noexcept { return THash()(ui64(x)); } }; diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.cpp b/ydb/core/tx/columnshard/engines/insert_table/committed.cpp new file mode 100644 index 000000000000..bd4bb9ff6c06 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.cpp @@ -0,0 +1,5 @@ +#include "committed.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h new file mode 100644 index 000000000000..ed2ffdadb3ca --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -0,0 +1,164 @@ +#pragma once +#include "user_data.h" + +#include + +namespace NKikimr::NOlap { + +class TCommittedData: public TUserDataContainer { +private: + using TBase = TUserDataContainer; + YDB_READONLY(TSnapshot, Snapshot, NOlap::TSnapshot::Zero()); + YDB_READONLY_DEF(TString, DedupId); + YDB_READONLY(bool, Remove, false); + +public: + TCommittedData(const std::shared_ptr& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId) + : TBase(userData) + , Snapshot(planStep, txId) + , DedupId(ToString(planStep) + ":" + ToString((ui64)insertWriteId)) { + } + + TCommittedData(const std::shared_ptr& userData, const ui64 planStep, const ui64 txId, const TString& dedupId) + : TBase(userData) + , Snapshot(planStep, txId) + , DedupId(dedupId) { + } + + TCommittedData(const std::shared_ptr& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId) + : TBase(userData) + , Snapshot(ss) + , DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) { + } + + void SetRemove() { + AFL_VERIFY(!Remove); + Remove = true; + } + + bool operator<(const TCommittedData& key) const { + if (Snapshot == key.Snapshot) { + if (UserData->GetPathId() == key.UserData->GetPathId()) { + return DedupId < key.DedupId; + } else { + return UserData->GetPathId() < key.UserData->GetPathId(); + } + } else { + return Snapshot < key.Snapshot; + } + } +}; + +class TCommittedBlob { +private: + TBlobRange BlobRange; + std::variant WriteInfo; + YDB_READONLY(ui64, SchemaVersion, 0); + YDB_READONLY(ui64, RecordsCount, 0); + YDB_READONLY(bool, IsDelete, false); + YDB_READONLY_DEF(std::optional, First); + YDB_READONLY_DEF(std::optional, Last); + YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); + +public: + ui64 GetSize() const { + return BlobRange.Size; + } + + const NArrow::TReplaceKey& GetFirstVerified() const { + Y_ABORT_UNLESS(First); + return *First; + } + + const NArrow::TReplaceKey& GetLastVerified() const { + Y_ABORT_UNLESS(Last); + return *Last; + } + + TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, + const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TSchemaSubset& subset) + : BlobRange(blobRange) + , WriteInfo(snapshot) + , SchemaVersion(schemaVersion) + , RecordsCount(recordsCount) + , IsDelete(isDelete) + , First(first) + , Last(last) + , SchemaSubset(subset) { + } + + TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, + const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TSchemaSubset& subset) + : BlobRange(blobRange) + , WriteInfo(writeId) + , SchemaVersion(schemaVersion) + , RecordsCount(recordsCount) + , IsDelete(isDelete) + , First(first) + , Last(last) + , SchemaSubset(subset) { + } + + /// It uses trick then we place key with planStep:txId in container and find them later by BlobId only. + /// So hash() and equality should depend on BlobId only. + bool operator==(const TCommittedBlob& key) const { + return BlobRange == key.BlobRange; + } + ui64 Hash() const noexcept { + return BlobRange.Hash(); + } + TString DebugString() const { + if (auto* ss = GetSnapshotOptional()) { + return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString(); + } else { + return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified(); + } + } + + bool HasSnapshot() const { + return GetSnapshotOptional(); + } + + const TSnapshot& GetSnapshotDef(const TSnapshot& def) const { + if (auto* snapshot = GetSnapshotOptional()) { + return *snapshot; + } else { + return def; + } + } + + const TSnapshot* GetSnapshotOptional() const { + return std::get_if(&WriteInfo); + } + + const TSnapshot& GetSnapshotVerified() const { + auto* result = GetSnapshotOptional(); + AFL_VERIFY(result); + return *result; + } + + const TInsertWriteId* GetWriteIdOptional() const { + return std::get_if(&WriteInfo); + } + + TInsertWriteId GetWriteIdVerified() const { + auto* result = GetWriteIdOptional(); + AFL_VERIFY(result); + return *result; + } + + const TBlobRange& GetBlobRange() const { + return BlobRange; + } +}; + +} // namespace NKikimr::NOlap + +template <> +struct THash { + inline size_t operator()(const NKikimr::NOlap::TCommittedBlob& key) const { + return key.Hash(); + } +}; diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h deleted file mode 100644 index 16e15936836c..000000000000 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ /dev/null @@ -1,253 +0,0 @@ -#pragma once -#include "meta.h" - -#include -#include -#include -#include -#include - -namespace NKikimr::NOlap { - -struct TInsertedData { -private: - TInsertedDataMeta Meta; - YDB_READONLY_DEF(TBlobRange, BlobRange); - class TBlobStorageGuard { - private: - YDB_READONLY_DEF(TString, Data); - - public: - TBlobStorageGuard(const TString& data) - : Data(data) { - } - ~TBlobStorageGuard(); - }; - - std::shared_ptr BlobDataGuard; - YDB_READONLY(bool, Remove, false); - -public: - ui64 PlanStep = 0; - ui64 WriteTxId = 0; - ui64 PathId = 0; - TString DedupId; - -private: - YDB_READONLY(ui64, SchemaVersion, 0); - YDB_READONLY_FLAG(NotAbortable, false); - -public: - void SetRemove() { - AFL_VERIFY(!Remove); - Remove = true; - } - - void MarkAsNotAbortable() { - NotAbortableFlag = true; - } - - std::optional GetBlobData() const { - if (BlobDataGuard) { - return BlobDataGuard->GetData(); - } else { - return {}; - } - } - - ui64 GetTxVolume() const { - return Meta.GetTxVolume() + sizeof(TBlobRange); - } - - const TInsertedDataMeta& GetMeta() const { - return Meta; - } - - TInsertedData() = delete; // avoid invalid TInsertedData anywhere - - TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional& blobData); - - TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, - const ui64 schemaVersion, const std::optional& blobData) - : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion, blobData) { - } - - TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional& blobData) - : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion, blobData) { - } - - ~TInsertedData(); - - bool operator<(const TInsertedData& key) const { - if (PlanStep < key.PlanStep) { - return true; - } else if (PlanStep > key.PlanStep) { - return false; - } - - // PlanStep == key.PlanStep - if (WriteTxId < key.WriteTxId) { - return true; - } else if (WriteTxId > key.WriteTxId) { - return false; - } - - // PlanStep == key.PlanStep && WriteTxId == key.WriteTxId - if (PathId < key.PathId) { - return true; - } else if (PathId > key.PathId) { - return false; - } - - return DedupId < key.DedupId; - } - - bool operator==(const TInsertedData& key) const { - return (PlanStep == key.PlanStep) && (WriteTxId == key.WriteTxId) && (PathId == key.PathId) && (DedupId == key.DedupId); - } - - /// We commit many writeIds in one txId. There could be several blobs with same WriteId and different DedupId. - /// One of them wins and becomes committed. Original DedupId would be lost then. - /// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}. - /// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId} - void Commit(const ui64 planStep, const ui64 txId) { - DedupId = ToString(PlanStep) + ":" + ToString((ui64)WriteTxId); - PlanStep = planStep; - WriteTxId = txId; - } - - /// Undo Commit() operation. Restore Initiator:WriteId from DedupId. - void Undo() { - TVector tokens; - size_t numTokens = Split(DedupId, ":", tokens); - Y_ABORT_UNLESS(numTokens == 2); - - PlanStep = FromString(tokens[0]); - WriteTxId = FromString(tokens[1]); - DedupId.clear(); - } - - TSnapshot GetSnapshot() const { - return TSnapshot(PlanStep, WriteTxId); - } - - ui32 BlobSize() const { - return BlobRange.GetBlobSize(); - } -}; - -class TCommittedBlob { -private: - TBlobRange BlobRange; - std::variant WriteInfo; - YDB_READONLY(ui64, SchemaVersion, 0); - YDB_READONLY(ui64, RecordsCount, 0); - YDB_READONLY(bool, IsDelete, false); - YDB_READONLY_DEF(std::optional, First); - YDB_READONLY_DEF(std::optional, Last); - YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); - -public: - ui64 GetSize() const { - return BlobRange.Size; - } - - const NArrow::TReplaceKey& GetFirstVerified() const { - Y_ABORT_UNLESS(First); - return *First; - } - - const NArrow::TReplaceKey& GetLastVerified() const { - Y_ABORT_UNLESS(Last); - return *Last; - } - - TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, - const NArrow::TSchemaSubset& subset) - : BlobRange(blobRange) - , WriteInfo(snapshot) - , SchemaVersion(schemaVersion) - , RecordsCount(recordsCount) - , IsDelete(isDelete) - , First(first) - , Last(last) - , SchemaSubset(subset) { - } - - TCommittedBlob(const TBlobRange& blobRange, const TWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, - const NArrow::TSchemaSubset& subset) - : BlobRange(blobRange) - , WriteInfo(writeId) - , SchemaVersion(schemaVersion) - , RecordsCount(recordsCount) - , IsDelete(isDelete) - , First(first) - , Last(last) - , SchemaSubset(subset) { - } - - /// It uses trick then we place key with planStep:txId in container and find them later by BlobId only. - /// So hash() and equality should depend on BlobId only. - bool operator==(const TCommittedBlob& key) const { - return BlobRange == key.BlobRange; - } - ui64 Hash() const noexcept { - return BlobRange.Hash(); - } - TString DebugString() const { - if (auto* ss = GetSnapshotOptional()) { - return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString(); - } else { - return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified(); - } - } - - bool HasSnapshot() const { - return GetSnapshotOptional(); - } - - const TSnapshot& GetSnapshotDef(const TSnapshot& def) const { - if (auto* snapshot = GetSnapshotOptional()) { - return *snapshot; - } else { - return def; - } - } - - const TSnapshot* GetSnapshotOptional() const { - return std::get_if(&WriteInfo); - } - - const TSnapshot& GetSnapshotVerified() const { - auto* result = GetSnapshotOptional(); - AFL_VERIFY(result); - return *result; - } - - const TWriteId* GetWriteIdOptional() const { - return std::get_if(&WriteInfo); - } - - TWriteId GetWriteIdVerified() const { - auto* result = GetWriteIdOptional(); - AFL_VERIFY(result); - return *result; - } - - const TBlobRange& GetBlobRange() const { - return BlobRange; - } -}; - -} // namespace NKikimr::NOlap - -template <> -struct THash { - inline size_t operator()(const NKikimr::NOlap::TCommittedBlob& key) const { - return key.Hash(); - } -}; diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 0aaf7499f8a1..a948c0077077 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -1,6 +1,7 @@ #include "insert_table.h" #include +#include #include #include @@ -17,7 +18,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { } TInsertionSummary::TCounters TInsertTable::Commit( - IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet& writeIds, std::function pathExists) { + IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet& writeIds, std::function pathExists) { Y_ABORT_UNLESS(!writeIds.empty()); TInsertionSummary::TCounters counters; @@ -34,14 +35,14 @@ TInsertionSummary::TCounters TInsertTable::Commit( dbTable.EraseInserted(*data); - const ui64 pathId = data->PathId; + const ui64 pathId = data->GetPathId(); auto* pathInfo = Summary.GetPathInfoOptional(pathId); // There could be commit after drop: propose, drop, plan if (pathInfo && pathExists(pathId)) { - data->Commit(planStep, txId); - dbTable.Commit(*data); + auto committed = data->Commit(planStep, txId); + dbTable.Commit(committed); - pathInfo->AddCommitted(std::move(*data)); + pathInfo->AddCommitted(std::move(committed)); } else { dbTable.Abort(*data); Summary.AddAborted(std::move(*data)); @@ -51,7 +52,7 @@ TInsertionSummary::TCounters TInsertTable::Commit( return counters; } -void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& writeIds) { +void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& writeIds) { Y_ABORT_UNLESS(!writeIds.empty()); for (auto writeId : writeIds) { @@ -64,19 +65,19 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& writeIds } } -THashSet TInsertTable::OldWritesToAbort(const TInstant& now) const { +THashSet TInsertTable::OldWritesToAbort(const TInstant& now) const { return Summary.GetExpiredInsertions(now - WaitCommitDelay, CleanupPackageSize); } void TInsertTable::EraseCommittedOnExecute( - IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { + IDbWrapper& dbTable, const TCommittedData& data, const std::shared_ptr& blobsAction) { if (Summary.HasCommitted(data)) { dbTable.EraseCommitted(data); RemoveBlobLinkOnExecute(data.GetBlobRange().BlobId, blobsAction); } } -void TInsertTable::EraseCommittedOnComplete(const TInsertedData& data) { +void TInsertTable::EraseCommittedOnComplete(const TCommittedData& data) { if (Summary.EraseCommitted(data)) { RemoveBlobLinkOnComplete(data.GetBlobRange().BlobId); } @@ -84,21 +85,26 @@ void TInsertTable::EraseCommittedOnComplete(const TInsertedData& data) { void TInsertTable::EraseAbortedOnExecute( IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { - if (Summary.HasAborted((TWriteId)data.WriteTxId)) { + if (Summary.HasAborted(data.GetInsertWriteId())) { dbTable.EraseAborted(data); RemoveBlobLinkOnExecute(data.GetBlobRange().BlobId, blobsAction); } } void TInsertTable::EraseAbortedOnComplete(const TInsertedData& data) { - if (Summary.EraseAborted((TWriteId)data.WriteTxId)) { + if (Summary.EraseAborted(data.GetInsertWriteId())) { RemoveBlobLinkOnComplete(data.GetBlobRange().BlobId); } } -bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) { +bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime) { Y_ABORT_UNLESS(!Loaded); Loaded = true; + LastWriteId = (TInsertWriteId)0; + if (!NColumnShard::Schema::GetSpecialValueOpt(db, NColumnShard::Schema::EValueIds::LastWriteId, LastWriteId)) { + return false; + } + return dbTable.Load(*this, loadTime); } @@ -113,14 +119,15 @@ std::vector TInsertTable::Read( result.reserve(pInfo->GetCommitted().size() + pInfo->GetInserted().size()); for (const auto& data : pInfo->GetCommitted()) { - if (lockId || data.GetSnapshot() <= reqSnapshot) - result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), - data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); + if (lockId || data.GetSnapshot() <= reqSnapshot) { + result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), + data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), + data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); + } } if (lockId) { for (const auto& [writeId, data] : pInfo->GetInserted()) { - result.emplace_back(TCommittedBlob(data.GetBlobRange(), TWriteId(writeId), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), + result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } @@ -128,6 +135,17 @@ std::vector TInsertTable::Read( return result; } +TInsertWriteId TInsertTable::BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + return BuildNextWriteId(db); +} + +TInsertWriteId TInsertTable::BuildNextWriteId(NIceDb::TNiceDb& db) { + TInsertWriteId writeId = ++LastWriteId; + NColumnShard::Schema::SaveSpecialValue(db, NColumnShard::Schema::EValueIds::LastWriteId, (ui64)writeId); + return writeId; +} + bool TInsertTableAccessor::RemoveBlobLinkOnExecute( const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction) { AFL_VERIFY(blobsAction); diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 360c457b3c3a..324cbbf87946 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -1,7 +1,11 @@ #pragma once -#include "data.h" -#include "rt_insertion.h" +#include "committed.h" +#include "inserted.h" #include "path_info.h" +#include "rt_insertion.h" + +#include +#include #include namespace NKikimr::NOlap { @@ -23,6 +27,7 @@ class TInsertTableAccessor { bool RemoveBlobLinkOnExecute(const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction); bool RemoveBlobLinkOnComplete(const TUnifiedBlobId& blobId); + public: void ErasePath(const ui64 pathId) { Summary.ErasePath(pathId); @@ -57,18 +62,22 @@ class TInsertTableAccessor { } return Summary.AddAborted(std::move(data), load); } - bool AddCommitted(TInsertedData&& data, const bool load) { + bool AddCommitted(TCommittedData&& data, const bool load) { if (load) { AddBlobLink(data.GetBlobRange().BlobId); } - const ui64 pathId = data.PathId; + const ui64 pathId = data.GetPathId(); return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load); } bool HasPathIdData(const ui64 pathId) const { return Summary.HasPathIdData(pathId); } - const THashMap& GetAborted() const { return Summary.GetAborted(); } - const THashMap& GetInserted() const { return Summary.GetInserted(); } + const THashMap& GetAborted() const { + return Summary.GetAborted(); + } + const THashMap& GetInserted() const { + return Summary.GetInserted(); + } const TInsertionSummary::TCounters& GetCountersPrepared() const { return Summary.GetCountersPrepared(); } @@ -83,28 +92,34 @@ class TInsertTableAccessor { class TInsertTable: public TInsertTableAccessor { private: bool Loaded = false; + TInsertWriteId LastWriteId = TInsertWriteId{ 0 }; + public: static constexpr const TDuration WaitCommitDelay = TDuration::Minutes(10); static constexpr ui64 CleanupPackageSize = 10000; bool Insert(IDbWrapper& dbTable, TInsertedData&& data); - TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, - const THashSet& writeIds, std::function pathExists); - void Abort(IDbWrapper& dbTable, const THashSet& writeIds); - void MarkAsNotAbortable(const TWriteId writeId) { + TInsertionSummary::TCounters Commit( + IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet& writeIds, std::function pathExists); + void Abort(IDbWrapper& dbTable, const THashSet& writeIds); + void MarkAsNotAbortable(const TInsertWriteId writeId) { Summary.MarkAsNotAbortable(writeId); } - THashSet OldWritesToAbort(const TInstant& now) const; + THashSet OldWritesToAbort(const TInstant& now) const; - void EraseCommittedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); - void EraseCommittedOnComplete(const TInsertedData& key); + void EraseCommittedOnExecute( + IDbWrapper& dbTable, const TCommittedData& key, const std::shared_ptr& blobsAction); + void EraseCommittedOnComplete(const TCommittedData& key); void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); void EraseAbortedOnComplete(const TInsertedData& key); std::vector Read( ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, const std::shared_ptr& pkSchema) const; - bool Load(IDbWrapper& dbTable, const TInstant loadTime); + bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime); + + TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); + TInsertWriteId BuildNextWriteId(NIceDb::TNiceDb& db); }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp b/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp new file mode 100644 index 000000000000..2986fc0b4c35 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp @@ -0,0 +1,12 @@ +#include "committed.h" +#include "inserted.h" + +#include + +namespace NKikimr::NOlap { + +TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) { + return TCommittedData(UserData, planStep, txId, InsertWriteId); +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/inserted.h b/ydb/core/tx/columnshard/engines/insert_table/inserted.h new file mode 100644 index 000000000000..e124edeb57e5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/inserted.h @@ -0,0 +1,35 @@ +#pragma once +#include "user_data.h" + +#include + +namespace NKikimr::NOlap { + +class TCommittedData; + +class TInsertedData: public TUserDataContainer { +private: + using TBase = TUserDataContainer; + YDB_READONLY(TInsertWriteId, InsertWriteId, TInsertWriteId(0)); + YDB_READONLY_FLAG(NotAbortable, false); + +public: + void MarkAsNotAbortable() { + NotAbortableFlag = true; + } + + TInsertedData() = delete; // avoid invalid TInsertedData anywhere + + TInsertedData(const TInsertWriteId writeId, const std::shared_ptr& userData) + : TBase(userData) + , InsertWriteId(writeId) { + } + + /// We commit many writeIds in one txId. There could be several blobs with same WriteId and different DedupId. + /// One of them wins and becomes committed. Original DedupId would be lost then. + /// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}. + /// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId} + [[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId); +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp index 695f3a413d86..e7f4041d6b09 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp @@ -29,7 +29,7 @@ void TPathInfo::AddInsertedSize(const i64 size, const ui64 overloadLimit) { SetInsertedOverload((ui64)InsertedSize > overloadLimit); } -bool TPathInfo::EraseCommitted(const TInsertedData& data) { +bool TPathInfo::EraseCommitted(const TCommittedData& data) { Summary->RemovePriority(*this); const bool result = Committed.erase(data); AddCommittedSize(-1 * (i64)data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); @@ -39,11 +39,11 @@ bool TPathInfo::EraseCommitted(const TInsertedData& data) { return result; } -bool TPathInfo::HasCommitted(const TInsertedData& data) { +bool TPathInfo::HasCommitted(const TCommittedData& data) { return Committed.contains(data); } -bool TPathInfo::AddCommitted(TInsertedData&& data, const bool load) { +bool TPathInfo::AddCommitted(TCommittedData&& data, const bool load) { const ui64 dataSize = data.BlobSize(); Summary->RemovePriority(*this); AddCommittedSize(data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); @@ -72,7 +72,7 @@ NKikimr::NOlap::TPathInfoIndexPriority TPathInfo::GetIndexationPriority() const } } -const THashMap& TPathInfo::GetInserted() const { +const THashMap& TPathInfo::GetInserted() const { return Summary->GetInserted(); } diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.h b/ydb/core/tx/columnshard/engines/insert_table/path_info.h index 15de624aa0ae..b5a5ccc32bc9 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/path_info.h +++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.h @@ -1,7 +1,10 @@ #pragma once +#include "committed.h" +#include "inserted.h" + #include + #include -#include "data.h" namespace NKikimr::NOlap { class TInsertionSummary; @@ -17,12 +20,11 @@ class TPathInfoIndexPriority { private: YDB_READONLY(EIndexationPriority, Category, EIndexationPriority::NoPriority); const ui32 Weight; + public: TPathInfoIndexPriority(const EIndexationPriority category, const ui32 weight) : Category(category) - , Weight(weight) - { - + , Weight(weight) { } bool operator!() const { @@ -37,7 +39,7 @@ class TPathInfoIndexPriority { class TPathInfo: public TMoveOnly { private: const ui64 PathId = 0; - TSet Committed; + TSet Committed; YDB_READONLY(i64, CommittedSize, 0); YDB_READONLY(i64, InsertedSize, 0); bool CommittedOverload = false; @@ -55,7 +57,7 @@ class TPathInfo: public TMoveOnly { return Committed.empty() && !InsertedSize; } - const THashMap& GetInserted() const; + const THashMap& GetInserted() const; void AddInsertedSize(const i64 size, const ui64 overloadLimit); @@ -67,18 +69,18 @@ class TPathInfo: public TMoveOnly { TPathInfoIndexPriority GetIndexationPriority() const; - bool EraseCommitted(const TInsertedData& data); - bool HasCommitted(const TInsertedData& data); + bool EraseCommitted(const TCommittedData& data); + bool HasCommitted(const TCommittedData& data); - const TSet& GetCommitted() const { + const TSet& GetCommitted() const { return Committed; } - bool AddCommitted(TInsertedData&& data, const bool load = false); + bool AddCommitted(TCommittedData&& data, const bool load = false); bool IsOverloaded() const { return CommittedOverload || InsertedOverload; } }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 1e6ee4d61789..8012211c418e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -89,10 +89,10 @@ void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize AFL_VERIFY(Counters.Inserted.GetDataSize() == (i64)StatsPrepared.Bytes); } -THashSet TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const { - THashSet result; +THashSet TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const { + THashSet result; for (auto& [writeId, data] : Inserted) { - if (data.PathId == pathId) { + if (data.GetPathId() == pathId) { result.insert(writeId); } } @@ -100,12 +100,12 @@ THashSet TInsertionSummary::GetInsertedByPathId(const return result; } -THashSet TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const { +THashSet TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const { if (timeBorder < MinInsertedTs) { return {}; } - THashSet toAbort; + THashSet toAbort; TInstant newMin = TInstant::Max(); for (auto& [writeId, data] : Inserted) { const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime(); @@ -122,7 +122,7 @@ THashSet TInsertionSummary::GetExpiredInsertions(const return toAbort; } -bool TInsertionSummary::EraseAborted(const TWriteId writeId) { +bool TInsertionSummary::EraseAborted(const TInsertWriteId writeId) { auto it = Aborted.find(writeId); if (it == Aborted.end()) { return false; @@ -132,7 +132,7 @@ bool TInsertionSummary::EraseAborted(const TWriteId writeId) { return true; } -bool TInsertionSummary::HasAborted(const TWriteId writeId) { +bool TInsertionSummary::HasAborted(const TInsertWriteId writeId) { auto it = Aborted.find(writeId); if (it == Aborted.end()) { return false; @@ -140,8 +140,8 @@ bool TInsertionSummary::HasAborted(const TWriteId writeId) { return true; } -bool TInsertionSummary::EraseCommitted(const TInsertedData& data) { - TPathInfo* pathInfo = GetPathInfoOptional(data.PathId); +bool TInsertionSummary::EraseCommitted(const TCommittedData& data) { + TPathInfo* pathInfo = GetPathInfoOptional(data.GetPathId()); if (!pathInfo) { Counters.Committed.SkipErase(data.BlobSize()); return false; @@ -155,8 +155,8 @@ bool TInsertionSummary::EraseCommitted(const TInsertedData& data) { } } -bool TInsertionSummary::HasCommitted(const TInsertedData& data) { - TPathInfo* pathInfo = GetPathInfoOptional(data.PathId); +bool TInsertionSummary::HasCommitted(const TCommittedData& data) { + TPathInfo* pathInfo = GetPathInfoOptional(data.GetPathId()); if (!pathInfo) { return false; } @@ -164,19 +164,19 @@ bool TInsertionSummary::HasCommitted(const TInsertedData& data) { } const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) { - const TWriteId writeId((TWriteId)data.WriteTxId); + const TInsertWriteId writeId = data.GetInsertWriteId(); Counters.Aborted.Add(data.BlobSize(), load); auto insertInfo = Aborted.emplace(writeId, std::move(data)); Y_ABORT_UNLESS(insertInfo.second); return &insertInfo.first->second; } -std::optional TInsertionSummary::ExtractInserted(const TWriteId id) { +std::optional TInsertionSummary::ExtractInserted(const TInsertWriteId id) { auto it = Inserted.find(id); if (it == Inserted.end()) { return {}; } else { - auto pathInfo = GetPathInfoOptional(it->second.PathId); + auto pathInfo = GetPathInfoOptional(it->second.GetPathId()); if (pathInfo) { OnEraseInserted(*pathInfo, it->second.BlobSize()); } @@ -187,9 +187,9 @@ std::optional TInsertionSummary::ExtractInserted( } const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) { - TWriteId writeId{ data.WriteTxId }; + const TInsertWriteId writeId = data.GetInsertWriteId(); const ui32 dataSize = data.BlobSize(); - const ui64 pathId = data.PathId; + const ui64 pathId = data.GetPathId(); auto insertInfo = Inserted.emplace(writeId, std::move(data)); if (insertInfo.second) { OnNewInserted(GetPathInfo(pathId), dataSize, load); diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h index e6adccee7249..8329557117e1 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -1,7 +1,10 @@ #pragma once +#include "inserted.h" +#include "path_info.h" + #include + #include -#include "path_info.h" namespace NKikimr::NOlap { class IBlobsDeclareRemovingAction; @@ -19,8 +22,8 @@ class TInsertionSummary { TCounters StatsCommitted; const NColumnShard::TInsertTableCounters Counters; - THashMap Inserted; - THashMap Aborted; + THashMap Inserted; + THashMap Aborted; mutable TInstant MinInsertedTs = TInstant::Zero(); std::map> Priorities; @@ -33,6 +36,7 @@ class TInsertionSummary { void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept; void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept; static TAtomicCounter CriticalInserted; + public: bool HasPathIdData(const ui64 pathId) const { auto it = PathInfo.find(pathId); @@ -52,7 +56,7 @@ class TInsertionSummary { PathInfo.erase(it); } - void MarkAsNotAbortable(const TWriteId writeId) { + void MarkAsNotAbortable(const TInsertWriteId writeId) { auto it = Inserted.find(writeId); if (it == Inserted.end()) { return; @@ -60,29 +64,33 @@ class TInsertionSummary { it->second.MarkAsNotAbortable(); } - THashSet GetInsertedByPathId(const ui64 pathId) const; + THashSet GetInsertedByPathId(const ui64 pathId) const; - THashSet GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const; + THashSet GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const; - const THashMap& GetInserted() const { + const THashMap& GetInserted() const { return Inserted; } - const THashMap& GetAborted() const { + const THashMap& GetAborted() const { return Aborted; } const TInsertedData* AddAborted(TInsertedData&& data, const bool load = false); - bool EraseAborted(const TWriteId writeId); - bool HasAborted(const TWriteId writeId); + bool EraseAborted(const TInsertWriteId writeId); + bool HasAborted(const TInsertWriteId writeId); - bool EraseCommitted(const TInsertedData& data); - bool HasCommitted(const TInsertedData& data); + bool EraseCommitted(const TCommittedData& data); + bool HasCommitted(const TCommittedData& data); const TInsertedData* AddInserted(TInsertedData&& data, const bool load = false); - std::optional ExtractInserted(const TWriteId id); + std::optional ExtractInserted(const TInsertWriteId id); - const TCounters& GetCountersPrepared() const { return StatsPrepared; } - const TCounters& GetCountersCommitted() const { return StatsCommitted; } + const TCounters& GetCountersPrepared() const { + return StatsPrepared; + } + const TCounters& GetCountersCommitted() const { + return StatsCommitted; + } const NColumnShard::TInsertTableCounters& GetCounters() const { return Counters; } @@ -101,4 +109,4 @@ class TInsertionSummary { } }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/user_data.cpp similarity index 55% rename from ydb/core/tx/columnshard/engines/insert_table/data.cpp rename to ydb/core/tx/columnshard/engines/insert_table/user_data.cpp index 36c17ba89779..f5d3db9d71d7 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/user_data.cpp @@ -1,4 +1,4 @@ -#include "data.h" +#include "user_data.h" #include namespace NKikimr::NOlap { @@ -27,27 +27,18 @@ class TInsertTableCacheController { } -TInsertedData::TBlobStorageGuard::~TBlobStorageGuard() { +TUserData::TBlobStorageGuard::~TBlobStorageGuard() { Singleton()->Return(Data.size()); } -TInsertedData::~TInsertedData() { -} - -TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional& blobData) + TUserData::TUserData(const ui64 pathId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, + const ui64 schemaVersion, const std::optional& blobData) : Meta(proto) , BlobRange(blobRange) - , PlanStep(planStep) - , WriteTxId(writeTxId) , PathId(pathId) - , DedupId(dedupId) , SchemaVersion(schemaVersion) { - if (blobData) { - AFL_VERIFY(blobData->size() == BlobRange.Size); - if (Singleton()->Take(blobData->size())) { - BlobDataGuard = std::make_shared(*blobData); - } + if (blobData && Singleton()->Take(blobData->size())) { + BlobDataGuard = std::make_shared(*blobData); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/user_data.h b/ydb/core/tx/columnshard/engines/insert_table/user_data.h new file mode 100644 index 000000000000..d734d90524eb --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/user_data.h @@ -0,0 +1,100 @@ +#pragma once +#include "meta.h" + +#include + +#include + +namespace NKikimr::NOlap { + +class TUserData { +private: + TInsertedDataMeta Meta; + YDB_READONLY_DEF(TBlobRange, BlobRange); + class TBlobStorageGuard { + private: + YDB_READONLY_DEF(TString, Data); + + public: + TBlobStorageGuard(const TString& data) + : Data(data) { + } + ~TBlobStorageGuard(); + }; + + std::shared_ptr BlobDataGuard; + YDB_READONLY(ui64, PathId, 0); + YDB_READONLY(ui64, SchemaVersion, 0); + +public: + TUserData() = delete; + TUserData(const ui64 pathId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, + const std::optional& blobData); + + static std::shared_ptr Build(const ui64 pathId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, + const std::optional& blobData) { + return std::make_shared(pathId, blobRange, proto, schemaVersion, blobData); + } + + static std::shared_ptr Build(const ui64 pathId, const TUnifiedBlobId& blobId, const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, + const std::optional& blobData) { + return std::make_shared(pathId, TBlobRange(blobId), proto, schemaVersion, blobData); + } + + std::optional GetBlobData() const { + if (BlobDataGuard) { + return BlobDataGuard->GetData(); + } else { + return std::nullopt; + } + } + + ui64 GetTxVolume() const { + return Meta.GetTxVolume() + sizeof(TBlobRange); + } + + const TInsertedDataMeta& GetMeta() const { + return Meta; + } +}; + +class TUserDataContainer { +protected: + std::shared_ptr UserData; + +public: + TUserDataContainer(const std::shared_ptr& userData) + : UserData(userData) { + AFL_VERIFY(UserData); + } + + ui64 GetSchemaVersion() const { + return UserData->GetSchemaVersion(); + } + + ui32 BlobSize() const { + return GetBlobRange().Size; + } + + ui32 GetTxVolume() const { + return UserData->GetTxVolume(); + } + + ui64 GetPathId() const { + return UserData->GetPathId(); + } + + const TBlobRange& GetBlobRange() const { + return UserData->GetBlobRange(); + } + + std::optional GetBlobData() const { + return UserData->GetBlobData(); + } + + const TInsertedDataMeta& GetMeta() const { + return UserData->GetMeta(); + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/insert_table/ya.make b/ydb/core/tx/columnshard/engines/insert_table/ya.make index fd56354b62e6..852761344626 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/ya.make +++ b/ydb/core/tx/columnshard/engines/insert_table/ya.make @@ -3,7 +3,9 @@ LIBRARY() SRCS( insert_table.cpp rt_insertion.cpp - data.cpp + user_data.cpp + inserted.cpp + committed.cpp path_info.cpp meta.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index 250239041e83..ee319fa216f2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -35,7 +35,7 @@ TConclusionStatus TReadMetadata::Init( if (LockId) { for (auto&& i : CommittedBlobs) { if (auto writeId = i.GetWriteIdOptional()) { - auto op = owner->GetOperationsManager().GetOperationVerified(*writeId); + auto op = owner->GetOperationsManager().GetOperationVerified((TOperationWriteId)*writeId); AddWriteIdToCheck(*writeId, op->GetLockId()); } } @@ -119,7 +119,7 @@ void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalIm } } -bool TReadMetadata::IsMyUncommitted(const TWriteId writeId) const { +bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { AFL_VERIFY(LockSharingInfo); auto it = ConflictedWriteIds.find(writeId); AFL_VERIFY(it != ConflictedWriteIds.end()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h index 3d9c33804f12..1e68dd77d789 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h @@ -45,7 +45,7 @@ struct TReadMetadata : public TReadMetadataBase { }; THashMap> LockConflictCounters; - THashMap ConflictedWriteIds; + THashMap ConflictedWriteIds; virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override; virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override; @@ -73,13 +73,13 @@ struct TReadMetadata : public TReadMetadataBase { return it->second->Val(); } - bool IsWriteConflictable(const TWriteId writeId) const { + bool IsWriteConflictable(const TInsertWriteId writeId) const { auto it = ConflictedWriteIds.find(writeId); AFL_VERIFY(it != ConflictedWriteIds.end()); return it->second.IsConflictable(); } - void AddWriteIdToCheck(const TWriteId writeId, const ui64 lockId) { + void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) { auto it = LockConflictCounters.find(lockId); if (it == LockConflictCounters.end()) { it = LockConflictCounters.emplace(lockId, std::make_shared()).first; @@ -87,9 +87,9 @@ struct TReadMetadata : public TReadMetadataBase { AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second); } - [[nodiscard]] bool IsMyUncommitted(const TWriteId writeId) const; + [[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const; - void SetConflictedWriteId(const TWriteId writeId) const { + void SetConflictedWriteId(const TInsertWriteId writeId) const { auto it = ConflictedWriteIds.find(writeId); AFL_VERIFY(it != ConflictedWriteIds.end()); it->second.MarkAsConflictable(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 3a781dadecba..82147598c216 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index a7a59fe67b2d..9f8eab48e28f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -37,7 +37,6 @@ namespace NStorageOptimizer { class IOptimizerPlannerConstructor; } class TPortionInfoWithBlobs; -struct TInsertedData; class TSnapshotColumnInfo; class ISnapshotSchema; using TNameTypeInfo = std::pair; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp index 9bf9ced276ea..5dd7c49a023b 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp @@ -18,13 +18,13 @@ class TTestInsertTableDB : public IDbWrapper { public: void Insert(const TInsertedData&) override { } - void Commit(const TInsertedData&) override { + void Commit(const TCommittedData&) override { } void Abort(const TInsertedData&) override { } void EraseInserted(const TInsertedData&) override { } - void EraseCommitted(const TInsertedData&) override { + void EraseCommitted(const TCommittedData&) override { } void EraseAborted(const TInsertedData&) override { } @@ -73,7 +73,7 @@ class TTestInsertTableDB : public IDbWrapper { Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { Y_UNIT_TEST(TestInsertCommit) { - ui64 writeId = 0; + TInsertWriteId writeId = (TInsertWriteId)0; ui64 tableId = 0; TString dedupId = "0"; TUnifiedBlobId blobId1(2222, 1, 1, 100, 2, 0, 1); @@ -81,23 +81,25 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { TTestInsertTableDB dbTable; TInsertTable insertTable; ui64 indexSnapshot = 0; - + // insert, not commited - bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, TLocalHelper::GetMetaProto(), indexSnapshot, {})); + auto userData1 = std::make_shared(tableId, TBlobRange(blobId1), TLocalHelper::GetMetaProto(), indexSnapshot, std::nullopt); + bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, userData1)); UNIT_ASSERT(ok); // insert the same blobId1 again - ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, TLocalHelper::GetMetaProto(), indexSnapshot, {})); + auto userData2 = std::make_shared(tableId, TBlobRange(blobId1), TLocalHelper::GetMetaProto(), indexSnapshot, std::nullopt); + ok = insertTable.Insert(dbTable, TInsertedData(writeId, userData2)); UNIT_ASSERT(!ok); // insert different blodId with the same writeId and dedupId TUnifiedBlobId blobId2(2222, 1, 2, 100, 2, 0, 1); - ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId2, TLocalHelper::GetMetaProto(), indexSnapshot, {})); + auto userData3 = std::make_shared(tableId, TBlobRange(blobId2), TLocalHelper::GetMetaProto(), indexSnapshot, std::nullopt); + ok = insertTable.Insert(dbTable, TInsertedData(writeId, userData3)); UNIT_ASSERT(!ok); // read nothing - auto blobs = insertTable.Read( - tableId, {} , TSnapshot::Zero(), nullptr); + auto blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); @@ -105,12 +107,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { // commit ui64 planStep = 100; ui64 txId = 42; - insertTable.Commit(dbTable, planStep, txId, { TWriteId{ writeId } }, [](ui64) { + insertTable.Commit(dbTable, planStep, txId, { writeId }, [](ui64) { return true; }); - UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().size(), 1); - UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().begin()->second.size(), 1); - UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1); +// UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().size(), 1); +// UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().begin()->second.size(), 1); +// UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1); // read old snapshot blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), nullptr); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index e61244d19451..ecde3aa56673 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -48,27 +48,27 @@ class TTestDbWrapper : public IDbWrapper { } void Insert(const TInsertedData& data) override { - Inserted.emplace(TWriteId{data.WriteTxId}, data); + Inserted.emplace(data.GetInsertWriteId(), data); } - void Commit(const TInsertedData& data) override { - Committed[data.PathId].emplace(data); + void Commit(const TCommittedData& data) override { + Committed[data.GetPathId()].emplace(data); } void Abort(const TInsertedData& data) override { - Aborted.emplace(TWriteId{data.WriteTxId}, data); + Aborted.emplace(data.GetInsertWriteId(), data); } void EraseInserted(const TInsertedData& data) override { - Inserted.erase(TWriteId{data.WriteTxId}); + Inserted.erase(data.GetInsertWriteId()); } - void EraseCommitted(const TInsertedData& data) override { - Committed[data.PathId].erase(data); + void EraseCommitted(const TCommittedData& data) override { + Committed[data.GetPathId()].erase(data); } void EraseAborted(const TInsertedData& data) override { - Aborted.erase(TWriteId{data.WriteTxId}); + Aborted.erase(data.GetInsertWriteId()); } bool Load(TInsertTableAccessor& accessor, @@ -189,9 +189,9 @@ class TTestDbWrapper : public IDbWrapper { } private: - THashMap Inserted; - THashMap> Committed; - THashMap Aborted; + THashMap Inserted; + THashMap> Committed; + THashMap Aborted; THashMap Indices; }; @@ -294,12 +294,8 @@ void AddIdsToBlobs(std::vector& portions, NBlo } } -bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector&& dataToIndex, +bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector&& dataToIndex, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { - for (ui32 i = 0; i < dataToIndex.size(); ++i) { - // Commited data always has nonzero planstep (for WriteLoadRead tests) - dataToIndex[i].PlanStep = i + 1; - }; std::shared_ptr changes = engine.StartInsert(std::move(dataToIndex)); if (!changes) { return false; @@ -447,16 +443,16 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] // load - TSnapshot indexSnaphot(1, 1); - TColumnEngineForLogs engine(0, CommonStoragesManager, indexSnaphot, TIndexInfo(tableInfo)); + TSnapshot indexSnapshot(1, 1); + TColumnEngineForLogs engine(0, CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } engine.Load(db); - std::vector dataToIndex = { - TInsertedData(2, paths[0], "", blobRanges[0].BlobId, TLocalHelper::GetMetaProto(), 0, {}), - TInsertedData(1, paths[0], "", blobRanges[1].BlobId, TLocalHelper::GetMetaProto(), 0, {}) + std::vector dataToIndex = { + TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), (TInsertWriteId)2), + TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), (TInsertWriteId)1) }; // write @@ -474,7 +470,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // selects auto lastSchema = engine.GetVersionedIndex().GetLastSchema(); - UNIT_ASSERT_EQUAL(lastSchema->GetSnapshot(), indexSnaphot); + UNIT_ASSERT_EQUAL(lastSchema->GetSnapshot(), indexSnapshot); const TIndexInfo& indexInfo = lastSchema->GetIndexInfo(); THashSet oneColumnId = { indexInfo.GetColumnIdVerified(testColumns[0].GetName()) }; THashSet columnIds; @@ -554,11 +550,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobs.Add(IStoragesManager::DefaultStorageId, blobRange, std::move(str1)); // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] - std::vector dataToIndex; + std::vector dataToIndex; + TSnapshot ss(planStep, txId); dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, TLocalHelper::GetMetaProto(), 0, {})); + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId)); - bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } @@ -652,11 +649,11 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobs.Add(IStoragesManager::DefaultStorageId, blobRange, std::move(testBlob)); // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] - std::vector dataToIndex; - dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, TLocalHelper::GetMetaProto(), 0, {})); + std::vector dataToIndex; + TSnapshot ss(planStep, txId); + dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId)); - bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); blobsAll.Merge(std::move(blobs)); UNIT_ASSERT(ok); } @@ -683,11 +680,11 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobs.Add(IStoragesManager::DefaultStorageId, blobRange, std::move(testBlob)); // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] - std::vector dataToIndex; - dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, TLocalHelper::GetMetaProto(), 0, {})); + std::vector dataToIndex; + TSnapshot ss(planStep, txId); + dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId))); - bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } @@ -730,11 +727,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobs.Add(IStoragesManager::DefaultStorageId, blobRange, std::move(str1)); // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] - std::vector dataToIndex; + TSnapshot ss(planStep, txId); + std::vector dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, TLocalHelper::GetMetaProto(), 0, {})); + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId))); - bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); blobStartTs += blobTsRange; if (txId == txCount / 2) { diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 0a4a5281497b..92e59e9b197c 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -91,7 +91,7 @@ class TWriteAggregation { YDB_READONLY(ui64, Size, 0); YDB_READONLY(ui64, Rows, 0); YDB_ACCESSOR_DEF(std::vector, SplittedBlobs); - YDB_READONLY_DEF(TVector, WriteIds); + YDB_READONLY_DEF(TVector, InsertWriteIds); YDB_READONLY_DEF(std::shared_ptr, BlobsAction); YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); std::shared_ptr RecordBatch; @@ -110,8 +110,8 @@ class TWriteAggregation { return WriteMeta; } - void AddWriteId(const TWriteId& id) { - WriteIds.emplace_back(id); + void AddInsertWriteId(const TInsertWriteId id) { + InsertWriteIds.emplace_back(id); } TWriteAggregation(const NEvWrite::TWriteData& writeData, std::vector&& splittedBlobs, const std::shared_ptr& batch) diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index 4772008f14f1..66b72ec25122 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -13,6 +13,7 @@ SRCS( filter.cpp portion_info.cpp tier_info.cpp + defs.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/operations/manager.cpp b/ydb/core/tx/columnshard/operations/manager.cpp index f6c8dcfc1e74..6d6633097efd 100644 --- a/ydb/core/tx/columnshard/operations/manager.cpp +++ b/ydb/core/tx/columnshard/operations/manager.cpp @@ -13,7 +13,7 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) { } while (!rowset.EndOfSet()) { - const TWriteId writeId = (TWriteId)rowset.GetValue(); + const TOperationWriteId writeId = (TOperationWriteId)rowset.GetValue(); const ui64 createdAtSec = rowset.GetValue(); const ui64 lockId = rowset.GetValue(); const ui64 cookie = rowset.GetValueOrDefault(0); @@ -138,7 +138,7 @@ void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const u OnTransactionFinishOnComplete(aborted, *lock, txId); } -TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const { +TWriteOperation::TPtr TOperationsManager::GetOperation(const TOperationWriteId writeId) const { auto it = Operations.find(writeId); if (it == Operations.end()) { return nullptr; @@ -176,7 +176,7 @@ void TOperationsManager::RemoveOperationOnComplete(const TWriteOperation::TPtr& Operations.erase(op->GetWriteId()); } -TWriteId TOperationsManager::BuildNextWriteId() { +TOperationWriteId TOperationsManager::BuildNextOperationWriteId() { return ++LastWriteId; } @@ -199,7 +199,7 @@ void TOperationsManager::LinkTransactionOnComplete(const ui64 /*lockId*/, const TWriteOperation::TPtr TOperationsManager::RegisterOperation( const ui64 lockId, const ui64 cookie, const std::optional granuleShardingVersionId, const NEvWrite::EModificationType mType) { - auto writeId = BuildNextWriteId(); + auto writeId = BuildNextOperationWriteId(); auto operation = std::make_shared( writeId, lockId, cookie, EOperationStatus::Draft, AppData()->TimeProvider->Now(), granuleShardingVersionId, mType); Y_ABORT_UNLESS(Operations.emplace(operation->GetWriteId(), operation).second); diff --git a/ydb/core/tx/columnshard/operations/manager.h b/ydb/core/tx/columnshard/operations/manager.h index 19d91acbaaa5..35bcc42556e3 100644 --- a/ydb/core/tx/columnshard/operations/manager.h +++ b/ydb/core/tx/columnshard/operations/manager.h @@ -120,8 +120,8 @@ class TOperationsManager { THashMap Tx2Lock; THashMap LockFeatures; - THashMap Operations; - TWriteId LastWriteId = TWriteId(0); + THashMap Operations; + TOperationWriteId LastWriteId = TOperationWriteId(0); public: @@ -129,11 +129,11 @@ class TOperationsManager { void AddEventForTx(TColumnShard& owner, const ui64 txId, const std::shared_ptr& writer); void AddEventForLock(TColumnShard& owner, const ui64 lockId, const std::shared_ptr& writer); - TWriteOperation::TPtr GetOperation(const TWriteId writeId) const; - TWriteOperation::TPtr GetOperationVerified(const TWriteId writeId) const { + TWriteOperation::TPtr GetOperation(const TOperationWriteId writeId) const; + TWriteOperation::TPtr GetOperationVerified(const TOperationWriteId writeId) const { return TValidator::CheckNotNull(GetOperationOptional(writeId)); } - TWriteOperation::TPtr GetOperationOptional(const TWriteId writeId) const { + TWriteOperation::TPtr GetOperationOptional(const TOperationWriteId writeId) const { return GetOperation(writeId); } void CommitTransactionOnExecute( @@ -199,7 +199,7 @@ class TOperationsManager { TOperationsManager(); private: - TWriteId BuildNextWriteId(); + TOperationWriteId BuildNextOperationWriteId(); void RemoveOperationOnExecute(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc); void RemoveOperationOnComplete(const TWriteOperation::TPtr& op); void OnTransactionFinishOnExecute(const TVector& operations, const TLockFeatures& lock, const ui64 txId, diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 2f2e339132c9..4a78726ca457 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -12,7 +12,7 @@ namespace NKikimr::NColumnShard { -TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 lockId, const ui64 cookie, const EOperationStatus& status, +TWriteOperation::TWriteOperation(const TOperationWriteId writeId, const ui64 lockId, const ui64 cookie, const EOperationStatus& status, const TInstant createdAt, const std::optional granuleShardingVersionId, const NEvWrite::EModificationType mType) : Status(status) , CreatedAt(createdAt) @@ -46,7 +46,7 @@ void TWriteOperation::CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor:: TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - for (auto gWriteId : GlobalWriteIds) { + for (auto gWriteId : InsertWriteIds) { auto pathExists = [&](ui64 pathId) { return owner.TablesManager.HasTable(pathId); }; @@ -61,10 +61,11 @@ void TWriteOperation::CommitOnComplete(TColumnShard& owner, const NOlap::TSnapsh owner.UpdateInsertTableCounters(); } -void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector& globalWriteIds, const bool ephemeralFlag) { +void TWriteOperation::OnWriteFinish( + NTabletFlatExecutor::TTransactionContext& txc, const std::vector& insertWriteIds, const bool ephemeralFlag) { Y_ABORT_UNLESS(Status == EOperationStatus::Started); Status = EOperationStatus::Prepared; - GlobalWriteIds = globalWriteIds; + InsertWriteIds = insertWriteIds; if (ephemeralFlag) { return; @@ -86,7 +87,7 @@ void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& tx } void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const { - for (auto&& writeId : GlobalWriteIds) { + for (auto&& writeId : InsertWriteIds) { proto.AddInternalWriteIds((ui64)writeId); } proto.SetModificationType((ui32)ModificationType); @@ -94,7 +95,7 @@ void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& prot void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationData& proto) { for (auto&& writeId : proto.GetInternalWriteIds()) { - GlobalWriteIds.push_back(TWriteId(writeId)); + InsertWriteIds.push_back(TInsertWriteId(writeId)); } if (proto.HasModificationType()) { ModificationType = (NEvWrite::EModificationType)proto.GetModificationType(); @@ -109,8 +110,8 @@ void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::T TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - THashSet writeIds; - writeIds.insert(GlobalWriteIds.begin(), GlobalWriteIds.end()); + THashSet writeIds; + writeIds.insert(InsertWriteIds.begin(), InsertWriteIds.end()); owner.InsertTable->Abort(dbTable, writeIds); } diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h index 8c022457f6bb..ad22caa651d4 100644 --- a/ydb/core/tx/columnshard/operations/write.h +++ b/ydb/core/tx/columnshard/operations/write.h @@ -26,7 +26,8 @@ namespace NKikimr::NColumnShard { class TColumnShard; -using TWriteId = NOlap::TWriteId; +using TOperationWriteId = NOlap::TOperationWriteId; +using TInsertWriteId = NOlap::TInsertWriteId; enum class EOperationStatus : ui32 { Draft = 1, @@ -46,10 +47,10 @@ enum class EOperationBehaviour : ui32 { class TWriteOperation { YDB_READONLY(EOperationStatus, Status, EOperationStatus::Draft); YDB_READONLY_DEF(TInstant, CreatedAt); - YDB_READONLY_DEF(TWriteId, WriteId); + YDB_READONLY_DEF(TOperationWriteId, WriteId); YDB_READONLY(ui64, LockId, 0); YDB_READONLY(ui64, Cookie, 0); - YDB_READONLY_DEF(TVector, GlobalWriteIds); + YDB_READONLY_DEF(std::vector, InsertWriteIds); YDB_ACCESSOR(EOperationBehaviour, Behaviour, EOperationBehaviour::Undefined); YDB_READONLY_DEF(std::optional, GranuleShardingVersionId); YDB_READONLY(NEvWrite::EModificationType, ModificationType, NEvWrite::EModificationType::Upsert); @@ -57,12 +58,12 @@ class TWriteOperation { public: using TPtr = std::shared_ptr; - TWriteOperation(const TWriteId writeId, const ui64 lockId, const ui64 cookie, const EOperationStatus& status, const TInstant createdAt, + TWriteOperation(const TOperationWriteId writeId, const ui64 lockId, const ui64 cookie, const EOperationStatus& status, const TInstant createdAt, const std::optional granuleShardingVersionId, const NEvWrite::EModificationType mType); void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const std::shared_ptr& schema, const TActorContext& ctx); - void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector& globalWriteIds, const bool ephemeralFlag); + void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const std::vector& insertWriteIds, const bool ephemeralFlag); void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const; void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const; void AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const; diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp index f064ea4ad3a7..2b0474ad4531 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp @@ -22,7 +22,7 @@ TLongTxTransactionOperator::TProposeResult TLongTxTransactionOperator::DoStartPr auto it = owner.InsertTable->GetInserted().find(writeId); if (it != owner.InsertTable->GetInserted().end()) { - auto granuleShardingInfo = owner.GetIndexAs().GetVersionedIndex().GetShardingInfoActual(it->second.PathId); + auto granuleShardingInfo = owner.GetIndexAs().GetVersionedIndex().GetShardingInfoActual(it->second.GetPathId()); if (granuleShardingInfo && lw.GranuleShardingVersionId && *lw.GranuleShardingVersionId != granuleShardingInfo->GetSnapshotVersion()) { return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " declined through sharding deprecated"); @@ -43,7 +43,7 @@ bool TLongTxTransactionOperator::DoParse(TColumnShard& /*owner*/, const TString& } for (auto& id : commitTxBody.GetWriteIds()) { - WriteIds.insert(TWriteId{ id }); + WriteIds.insert(TInsertWriteId{ id }); } return true; } diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index 4933095c81de..45b642c8e98a 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -68,7 +68,7 @@ namespace NKikimr::NColumnShard { owner.Counters.GetTabletCounters()->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); NIceDb::TNiceDb db(txc.DB); - for (TWriteId writeId : WriteIds) { + for (TInsertWriteId writeId : WriteIds) { AFL_VERIFY(owner.RemoveLongTxWrite(db, writeId, GetTxId())); } owner.UpdateInsertTableCounters(); @@ -84,7 +84,7 @@ namespace NKikimr::NColumnShard { virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { NIceDb::TNiceDb db(txc.DB); - for (TWriteId writeId : WriteIds) { + for (TInsertWriteId writeId : WriteIds) { AFL_VERIFY(owner.RemoveLongTxWrite(db, writeId, GetTxId())); } TBlobGroupSelector dsGroupSelector(owner.Info()); @@ -97,7 +97,7 @@ namespace NKikimr::NColumnShard { } private: - THashSet WriteIds; + THashSet WriteIds; }; } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 716b6eae6174..f1f4df107ffe 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -16,7 +16,6 @@ SRCS( columnshard__write_index.cpp columnshard.cpp columnshard_impl.cpp - columnshard_common.cpp columnshard_private_events.cpp columnshard_schema.cpp columnshard_view.cpp