From a8ba671082b60e5199e46af838ddfc763699c32d Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Dec 2024 10:47:35 +0300 Subject: [PATCH 1/4] table have to be readable with snapshot livetime --- .../blobs_action/transaction/tx_write.cpp | 2 +- ydb/core/tx/columnshard/columnshard_impl.cpp | 8 +++-- ydb/core/tx/columnshard/tables_manager.cpp | 31 ++++++++++++------- ydb/core/tx/columnshard/tables_manager.h | 22 ++++++++++--- 4 files changed, 42 insertions(+), 21 deletions(-) diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 8bbcfce0e07d..9c3f34838a25 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -42,7 +42,7 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) { const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteMeta(); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId(), true)); txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f68cc93dd473..d9017da74e15 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() { return; } - auto changes = - TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager); + const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot(); + THashSet pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot); + + auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager); if (!changes) { ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes"); return; @@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() { } THashSet pathIdsEmptyInInsertTable; - for (auto&& i : TablesManager.GetPathsToDrop()) { + for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) { if (InsertTable->HasPathIdData(i)) { continue; } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 63a74e12fb2b..d9cf3e3de74e 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { return false; } if (table.IsDropped()) { - PathsToDrop.insert(table.GetPathId()); + AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second); } AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second); @@ -212,8 +212,8 @@ bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const { return true; } -bool TTablesManager::IsReadyForWrite(const ui64 pathId) const { - return HasPrimaryIndex() && HasTable(pathId); +bool TTablesManager::IsReadyForWrite(const ui64 pathId, const bool withDeleted) const { + return HasPrimaryIndex() && HasTable(pathId, withDeleted); } bool TTablesManager::HasPreset(const ui32 presetId) const { @@ -237,7 +237,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio AFL_VERIFY(Tables.contains(pathId)); auto& table = Tables[pathId]; table.SetDropVersion(version); - PathsToDrop.insert(pathId); + AFL_VERIFY(PathsToDrop[version].emplace(pathId).second); Ttl.erase(pathId); Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId()); } @@ -363,13 +363,15 @@ TTablesManager::TTablesManager(const std::shared_ptr& s } bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const { - auto itDrop = PathsToDrop.find(pathId); + const auto& itTable = Tables.find(pathId); + AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); + auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified()); AFL_VERIFY(itDrop != PathsToDrop.end()); + AFL_VERIFY(itDrop->second.contains(pathId)); + AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); NIceDb::TNiceDb db(dbTable); NColumnShard::Schema::EraseTableInfo(db, pathId); - const auto& itTable = Tables.find(pathId); - AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); for (auto&& tableVersion : itTable->second.GetVersions()) { NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion); } @@ -377,13 +379,18 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co } bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) { - auto itDrop = PathsToDrop.find(pathId); - AFL_VERIFY(itDrop != PathsToDrop.end()); - AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); - AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId)); - PathsToDrop.erase(itDrop); const auto& itTable = Tables.find(pathId); AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); + { + auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified()); + AFL_VERIFY(itDrop != PathsToDrop.end()); + AFL_VERIFY(itDrop->second.erase(pathId)); + if (itDrop->second.empty()) { + PathsToDrop.erase(itDrop); + } + } + AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); + AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId)); Tables.erase(itTable); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size()); return true; diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 2f8d41832814..f6565b9df8ee 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -105,7 +105,12 @@ class TTableInfo { return PathId; } + const NOlap::TSnapshot& GetDropVersionVerified() const { + return DropVersion; + } + void SetDropVersion(const NOlap::TSnapshot& version) { + AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString()); DropVersion = version; } @@ -139,7 +144,7 @@ class TTablesManager { THashMap Tables; THashSet SchemaPresetsIds; THashMap ActualSchemaForPreset; - THashSet PathsToDrop; + std::map> PathsToDrop; THashMap Ttl; std::unique_ptr PrimaryIndex; std::shared_ptr StoragesManager; @@ -166,12 +171,19 @@ class TTablesManager { return Ttl; } - const THashSet& GetPathsToDrop() const { + const std::map>& GetPathsToDrop() const { return PathsToDrop; } - THashSet& MutablePathsToDrop() { - return PathsToDrop; + THashSet GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const { + THashSet result; + for (auto&& i : PathsToDrop) { + if (minReadSnapshot < i.first) { + break; + } + result.insert(i.second.begin(), i.second.end()); + } + return result; } const THashMap& GetTables() const { @@ -237,7 +249,7 @@ class TTablesManager { ui64 GetMemoryUsage() const; bool HasTable(const ui64 pathId, bool withDeleted = false) const; - bool IsReadyForWrite(const ui64 pathId) const; + bool IsReadyForWrite(const ui64 pathId, bool withDeleted = false) const; bool HasPreset(const ui32 presetId) const; void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db); From 5092d3fd83b2389fe2abc34dd6d719eb5ef73384 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Dec 2024 10:57:40 +0300 Subject: [PATCH 2/4] fix build --- ydb/core/tx/columnshard/tables_manager.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index f6565b9df8ee..250c20b2f437 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -106,7 +106,8 @@ class TTableInfo { } const NOlap::TSnapshot& GetDropVersionVerified() const { - return DropVersion; + AFL_VERIFY(DropVersion); + return *DropVersion; } void SetDropVersion(const NOlap::TSnapshot& version) { @@ -144,7 +145,7 @@ class TTablesManager { THashMap Tables; THashSet SchemaPresetsIds; THashMap ActualSchemaForPreset; - std::map> PathsToDrop; + std::map> PathsToDrop; THashMap Ttl; std::unique_ptr PrimaryIndex; std::shared_ptr StoragesManager; @@ -171,7 +172,7 @@ class TTablesManager { return Ttl; } - const std::map>& GetPathsToDrop() const { + const std::map>& GetPathsToDrop() const { return PathsToDrop; } From 2c35af14ec7de11dcb0d9c7e3ca9073000fc6bd3 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Dec 2024 11:01:41 +0300 Subject: [PATCH 3/4] fix checker --- .../blobs_action/transaction/tx_write.cpp | 3 ++- ydb/core/tx/columnshard/tables_manager.cpp | 15 +++++++++++++++ ydb/core/tx/columnshard/tables_manager.h | 12 ++++++++++-- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 9c3f34838a25..491638cef8b0 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + const auto minReadSnapshot = Self->GetMinReadSnapshot(); for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteMeta(); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId(), true)); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId(), minReadSnapshot)); txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index d9cf3e3de74e..629cffd29233 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -212,10 +212,25 @@ bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const { return true; } +bool TTablesManager::HasTable(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { + auto it = Tables.find(pathId); + if (it == Tables.end()) { + return false; + } + if (it->second.IsDropped(minReadSnapshot)) { + return false; + } + return true; +} + bool TTablesManager::IsReadyForWrite(const ui64 pathId, const bool withDeleted) const { return HasPrimaryIndex() && HasTable(pathId, withDeleted); } +bool TTablesManager::IsReadyForWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { + return HasPrimaryIndex() && HasTable(pathId, minReadSnapshot); +} + bool TTablesManager::HasPreset(const ui32 presetId) const { return SchemaPresetsIds.contains(presetId); } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 250c20b2f437..f3ccbb67bf50 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -119,8 +119,14 @@ class TTableInfo { Versions.insert(snapshot); } - bool IsDropped() const { - return DropVersion.has_value(); + bool IsDropped(const std::optional& minReadSnapshot = std::nullopt) const { + if (!DropVersion) { + return false; + } + if (!minReadSnapshot) { + return true; + } + return *DropVersion < *minReadSnapshot; } TTableInfo() = default; @@ -250,7 +256,9 @@ class TTablesManager { ui64 GetMemoryUsage() const; bool HasTable(const ui64 pathId, bool withDeleted = false) const; + bool HasTable(const ui64 pathId, const TSnapshot& minReadSnapshot) const; bool IsReadyForWrite(const ui64 pathId, bool withDeleted = false) const; + bool IsReadyForWrite(const ui64 pathId, const TSnapshot& minReadSnapshot) const; bool HasPreset(const ui32 presetId) const; void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db); From 7c1330d996740b068f0156e94890cf0f8bc8ea72 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Dec 2024 11:06:52 +0300 Subject: [PATCH 4/4] correction --- .../transaction/tx_blobs_written.cpp | 3 ++- .../blobs_action/transaction/tx_write.cpp | 2 +- .../tx/columnshard/columnshard__write.cpp | 15 ++----------- ydb/core/tx/columnshard/tables_manager.cpp | 21 +++++-------------- ydb/core/tx/columnshard/tables_manager.h | 7 +++---- 5 files changed, 13 insertions(+), 35 deletions(-) diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp index b365f542df73..7d38657d83e6 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp @@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); auto& index = Self->MutableIndexAs(); + const auto minReadSnapshot = Self->GetMinReadSnapshot(); for (auto&& pack : Packs) { const auto& writeMeta = pack.GetWriteMeta(); - AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot)); AFL_VERIFY(!writeMeta.HasLongTxId()); auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 491638cef8b0..52d3bed0a6b5 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -43,7 +43,7 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) { const auto minReadSnapshot = Self->GetMinReadSnapshot(); for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteMeta(); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId(), minReadSnapshot)); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot)); txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 68a83bf1a1f7..cb70844d1a9d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo "writing_id", writeMeta.GetId())("status", putResult.GetPutStatus()); Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1); - if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { - ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); - Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); - - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(writeMeta.GetSource(), result.release()); - Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable); - wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); - continue; - } - if (putResult.GetPutStatus() != NKikimrProto::OK) { Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant()); Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); @@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled); } - if (!TablesManager.IsReadyForWrite(pathId)) { + if (!TablesManager.IsReadyForStartWrite(pathId)) { LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index") << " at tablet " << TabletID()); @@ -558,7 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto pathId = operation.GetTableId().GetTableId(); - if (!TablesManager.IsReadyForWrite(pathId)) { + if (!TablesManager.IsReadyForStartWrite(pathId)) { sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR); return; } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 629cffd29233..cd6eda199c41 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -201,34 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { return true; } -bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const { - auto it = Tables.find(pathId); - if (it == Tables.end()) { - return false; - } - if (it->second.IsDropped()) { - return withDeleted; - } - return true; -} - -bool TTablesManager::HasTable(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { +bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional minReadSnapshot) const { auto it = Tables.find(pathId); if (it == Tables.end()) { return false; } if (it->second.IsDropped(minReadSnapshot)) { - return false; + return withDeleted; } return true; } -bool TTablesManager::IsReadyForWrite(const ui64 pathId, const bool withDeleted) const { +bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const { return HasPrimaryIndex() && HasTable(pathId, withDeleted); } -bool TTablesManager::IsReadyForWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { - return HasPrimaryIndex() && HasTable(pathId, minReadSnapshot); +bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { + return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot); } bool TTablesManager::HasPreset(const ui32 presetId) const { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index f3ccbb67bf50..590eeeb8ea8b 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -255,10 +255,9 @@ class TTablesManager { const TTableInfo& GetTable(const ui64 pathId) const; ui64 GetMemoryUsage() const; - bool HasTable(const ui64 pathId, bool withDeleted = false) const; - bool HasTable(const ui64 pathId, const TSnapshot& minReadSnapshot) const; - bool IsReadyForWrite(const ui64 pathId, bool withDeleted = false) const; - bool IsReadyForWrite(const ui64 pathId, const TSnapshot& minReadSnapshot) const; + bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional minReadSnapshot = std::nullopt) const; + bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const; + bool IsReadyForFinishWrite(const ui64 pathId, const TSnapshot& minReadSnapshot) const; bool HasPreset(const ui32 presetId) const; void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);