diff --git a/ydb/core/kqp/ut/olap/tiering_ut.cpp b/ydb/core/kqp/ut/olap/tiering_ut.cpp index 3af46ad7b5a9..b45aeb5b36e1 100644 --- a/ydb/core/kqp/ut/olap/tiering_ut.cpp +++ b/ydb/core/kqp/ut/olap/tiering_ut.cpp @@ -5,6 +5,7 @@ #include "helpers/writer.h" #include +#include #include #include #include @@ -12,157 +13,139 @@ namespace NKikimr::NKqp { -class TTestEvictionBase { -protected: +class TTieringTestHelper { +private: std::optional TestHelper; - TString TieringRule; - -protected: - virtual void UnevictAll() = 0; - virtual TString GetTierPathOverride() const { - return "/Root/tier1"; - } + std::optional OlapHelper; + std::optional> CsController; public: - void RunTest() { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - csController->SetSkipSpecialCheckForEvict(true); + TTieringTestHelper() { + CsController.emplace(NYDBTest::TControllers::RegisterCSControllerGuard()); + (*CsController)->SetSkipSpecialCheckForEvict(true); TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; TestHelper.emplace(runnerSettings); - TLocalHelper localHelper(TestHelper->GetKikimr()); - // TestHelper->GetRuntime().SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); - // TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + OlapHelper.emplace(TestHelper->GetKikimr()); TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG); - // TestHelper->GetRuntime().SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG); - // TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); - // TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); + TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION, NActors::NLog::PRI_DEBUG); + TestHelper->GetRuntime().SetLogPriority(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER, NActors::NLog::PRI_DEBUG); NYdb::NTable::TTableClient tableClient = TestHelper->GetKikimr().GetTableClient(); Tests::NCommon::TLoggerInit(TestHelper->GetKikimr()).Initialize(); Singleton()->SetSecretKey("fakeSecret"); + } + + TTestHelper& GetTestHelper() { + AFL_VERIFY(TestHelper); + return *TestHelper; + } + + TLocalHelper& GetOlapHelper() { + AFL_VERIFY(OlapHelper); + return *OlapHelper; + } - localHelper.CreateTestOlapTable(); - TestHelper->CreateTier("tier1"); + NYDBTest::TControllers::TGuard& GetCsController() { + AFL_VERIFY(CsController); + return *CsController; + } + void WriteSampleData() { for (ui64 i = 0; i < 100; ++i) { WriteTestData(TestHelper->GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 10000, 1000); WriteTestData(TestHelper->GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 10000, 1000); } + } - csController->WaitCompactions(TDuration::Seconds(5)); - csController->WaitActualization(TDuration::Seconds(5)); + void CheckAllDataInTier(const TString& tierName) { + NYdb::NTable::TTableClient tableClient = TestHelper->GetKikimr().GetTableClient(); + + auto selectQuery = TString(R"( + SELECT + TierName, SUM(ColumnRawBytes) AS RawBytes, SUM(Rows) AS Rows + FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` + WHERE Activity == 1 + GROUP BY TierName + )"); + + auto rows = ExecuteScanQuery(tableClient, selectQuery); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), tierName); + } +}; - ui64 columnRawBytes = 0; - { - auto selectQuery = TString(R"( - SELECT - TierName, SUM(ColumnRawBytes) As RawBytes - FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` - WHERE Activity == 1 - GROUP BY TierName - )"); +Y_UNIT_TEST_SUITE(KqpOlapTiering) { - auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "__DEFAULT"); + Y_UNIT_TEST(EvictionResetTiering) { + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); - columnRawBytes = GetUint64(rows[0].at("RawBytes")); - UNIT_ASSERT_GT(columnRawBytes, 0); - } + olapHelper.CreateTestOlapTable(); + testHelper.CreateTier("tier1"); + tieringHelper.WriteSampleData(); + csController->WaitCompactions(TDuration::Seconds(5)); + csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("__DEFAULT"); - TestHelper->SetTiering("/Root/olapStore/olapTable", GetTierPathOverride(), "timestamp"); + testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp"); csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("/Root/tier1"); - { - auto selectQuery = TString(R"( - SELECT - TierName, SUM(ColumnRawBytes) As RawBytes - FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` - WHERE Activity == 1 - GROUP BY TierName - )"); + testHelper.ResetTiering("/Root/olapStore/olapTable"); + csController->WaitCompactions(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("__DEFAULT"); + } - auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1"); - UNIT_ASSERT_VALUES_EQUAL_C(GetUint64(rows[0].at("RawBytes")), columnRawBytes, - TStringBuilder() << "RawBytes changed after eviction: before=" << columnRawBytes - << " after=" << GetUint64(rows[0].at("RawBytes"))); - } + Y_UNIT_TEST(EvictionIncreaseDuration) { + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); - UnevictAll(); + olapHelper.CreateTestOlapTable(); + testHelper.CreateTier("tier1"); + tieringHelper.WriteSampleData(); csController->WaitCompactions(TDuration::Seconds(5)); + csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("__DEFAULT"); - { - auto selectQuery = TString(R"( - SELECT - TierName, SUM(ColumnRawBytes) As RawBytes - FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` - WHERE Activity == 1 - GROUP BY TierName - )"); + testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp"); + csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("/Root/tier1"); - auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "__DEFAULT"); - UNIT_ASSERT_VALUES_EQUAL_C(GetUint64(rows[0].at("RawBytes")), columnRawBytes, - TStringBuilder() << "RawBytes changed after resetting tiering: before=" << columnRawBytes - << " after=" << GetUint64(rows[0].at("RawBytes"))); + { + const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON timestamp)"; + auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } - - } -}; - -class TTestEvictionResetTiering : public TTestEvictionBase { - private: - void UnevictAll() { - TestHelper->ResetTiering("/Root/olapStore/olapTable"); - } -}; - -class TTestEvictionWithStrippedEdsPath : public TTestEvictionResetTiering { - private: - TString GetTierPathOverride() const { - return "Root/tier1"; - } -}; - -class TTestEvictionIncreaseDuration : public TTestEvictionBase { - private: - void UnevictAll() { - const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON timestamp)"; - auto result = TestHelper->GetSession().ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + csController->WaitCompactions(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("__DEFAULT"); } -}; -Y_UNIT_TEST_SUITE(KqpOlapTiering) { - - Y_UNIT_TEST(EvictionResetTiering) { - TTestEvictionResetTiering().RunTest(); - } + Y_UNIT_TEST(EvictionWithStrippedEdsPath) { + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); - Y_UNIT_TEST(EvictionIncreaseDuration) { - TTestEvictionIncreaseDuration().RunTest(); - } + olapHelper.CreateTestOlapTable(); + testHelper.CreateTier("tier1"); + tieringHelper.WriteSampleData(); - Y_UNIT_TEST(EvictionWithStrippedEdsPath) { - TTestEvictionWithStrippedEdsPath().RunTest(); + testHelper.SetTiering("/Root/olapStore/olapTable", "Root/tier1", "timestamp"); + csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("/Root/tier1"); } Y_UNIT_TEST(TieringValidation) { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + TTieringTestHelper tieringHelper; + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - TLocalHelper localHelper(testHelper.GetKikimr()); - testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG); - NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient(); - Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); - - localHelper.CreateTestOlapTable(); + olapHelper.CreateTestOlapTable(); testHelper.CreateTier("tier1"); { @@ -181,26 +164,15 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) { } Y_UNIT_TEST(DeletedTier) { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - csController->SetSkipSpecialCheckForEvict(true); - - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - TLocalHelper localHelper(testHelper.GetKikimr()); - testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG); + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient(); - Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); - Singleton()->SetSecretKey("fakeSecret"); - localHelper.CreateTestOlapTable(); + olapHelper.CreateTestOlapTable(); testHelper.CreateTier("tier1"); - - for (ui64 i = 0; i < 100; ++i) { - WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000); - WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000); - } - + tieringHelper.WriteSampleData(); testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp"); csController->WaitCompactions(TDuration::Seconds(5)); csController->WaitActualization(TDuration::Seconds(5)); @@ -208,27 +180,14 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) { csController->DisableBackground(NYDBTest::ICSController::EBackground::TTL); testHelper.ResetTiering("/Root/olapStore/olapTable"); testHelper.RebootTablets("/Root/olapStore/olapTable"); + tieringHelper.CheckAllDataInTier("/Root/tier1"); + TString selectQuery = R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)"; + ui64 scanResult; { - auto selectQuery = TString(R"( - SELECT - TierName, SUM(ColumnRawBytes) As RawBytes - FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` - WHERE Activity == 1 - GROUP BY TierName - )"); - auto rows = ExecuteScanQuery(tableClient, selectQuery); UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1"); - } - - ui64 maxLevelValue; - { - auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)"); - auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - maxLevelValue = GetInt32(rows[0].at("level")); + scanResult = GetInt32(rows[0].at("level")); } { @@ -238,7 +197,6 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) { testHelper.RebootTablets("/Root/olapStore/olapTable"); { - auto selectQuery = TString(R"(SELECT MAX(level) FROM `/Root/olapStore/olapTable`)"); auto it = tableClient.StreamExecuteScanQuery(selectQuery, NYdb::NTable::TStreamExecScanQuerySettings()).GetValueSync(); auto streamPart = it.ReadNext().GetValueSync(); UNIT_ASSERT(!streamPart.IsSuccess()); @@ -249,26 +207,20 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) { testHelper.RebootTablets("/Root/olapStore/olapTable"); { - auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)"); auto rows = ExecuteScanQuery(tableClient, selectQuery); UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), maxLevelValue); + UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), scanResult); } } Y_UNIT_TEST(TtlBorders) { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - TLocalHelper localHelper(testHelper.GetKikimr()); - testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG); + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient(); - Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); - Singleton()->SetSecretKey("fakeSecret"); - localHelper.CreateTestOlapTable("olapTable", "olapStore", 1, 1); + olapHelper.CreateTestOlapTable("olapTable", "olapStore", 1, 1); { const TDuration tsInterval = TDuration::Days(3650); @@ -316,6 +268,42 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) { UNIT_ASSERT_GT(GetUint64(rows[0].at("count")), 0); } } + + Y_UNIT_TEST(LocksInterference) { + TTieringTestHelper tieringHelper; + auto& csController = tieringHelper.GetCsController(); + auto& olapHelper = tieringHelper.GetOlapHelper(); + auto& testHelper = tieringHelper.GetTestHelper(); + NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient(); + + olapHelper.CreateTestOlapTable(); + tieringHelper.WriteSampleData(); + csController->WaitCompactions(TDuration::Seconds(5)); + + csController->RegisterLock("table", std::make_shared("table", THashSet({0, 1, 2, 3, 4, 5}), NOlap::NDataLocks::ELockCategory::Compaction)); + { + const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("PT1S") ON timestamp)"; + auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + csController->WaitActualization(TDuration::Seconds(5)); + tieringHelper.CheckAllDataInTier("__DEFAULT"); + + csController->UnregisterLock("table"); + csController->EnableBackground(NYDBTest::ICSController::EBackground::TTL); + csController->WaitActualization(TDuration::Seconds(5)); + csController->WaitTtl(TDuration::Seconds(5)); + { + auto selectQuery = TString(R"( + SELECT * + FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats` + WHERE Activity == 1 + )"); + + auto rows = ExecuteScanQuery(tableClient, selectQuery); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 0); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 1ac88a6d8283..6fef72e14bef 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -18,6 +18,7 @@ class THelperSchemaless : public NCommon::THelper { void SendDataViaActorSystem(TString testTable, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const; virtual std::shared_ptr TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui64 tsStepUs = 1) const = 0; + virtual ~THelperSchemaless() = default; }; class THelper: public THelperSchemaless { diff --git a/ydb/core/tx/columnshard/data_locks/manager/manager.cpp b/ydb/core/tx/columnshard/data_locks/manager/manager.cpp index 0ded5dc7f4dd..c4bcbd1cc9ac 100644 --- a/ydb/core/tx/columnshard/data_locks/manager/manager.cpp +++ b/ydb/core/tx/columnshard/data_locks/manager/manager.cpp @@ -1,4 +1,5 @@ #include "manager.h" + #include namespace NKikimr::NOlap::NDataLocks { @@ -17,28 +18,12 @@ void TManager::UnregisterLock(const TString& processId) { std::optional TManager::IsLocked( const TPortionInfo& portion, const ELockCategory lockCategory, const THashSet& excludedLocks) const { - for (auto&& i : ProcessLocks) { - if (excludedLocks.contains(i.first)) { - continue; - } - if (auto lockName = i.second->IsLocked(portion, lockCategory, excludedLocks)) { - return lockName; - } - } - return {}; + return IsLockedImpl(portion, lockCategory, excludedLocks); } std::optional TManager::IsLocked( const TGranuleMeta& granule, const ELockCategory lockCategory, const THashSet& excludedLocks) const { - for (auto&& i : ProcessLocks) { - if (excludedLocks.contains(i.first)) { - continue; - } - if (auto lockName = i.second->IsLocked(granule, lockCategory, excludedLocks)) { - return lockName; - } - } - return {}; + return IsLockedImpl(granule, lockCategory, excludedLocks); } std::optional TManager::IsLocked(const std::shared_ptr& portion, const ELockCategory lockCategory, diff --git a/ydb/core/tx/columnshard/data_locks/manager/manager.h b/ydb/core/tx/columnshard/data_locks/manager/manager.h index 7d7d43f57327..6baa49f8fc92 100644 --- a/ydb/core/tx/columnshard/data_locks/manager/manager.h +++ b/ydb/core/tx/columnshard/data_locks/manager/manager.h @@ -1,7 +1,11 @@ #pragma once #include +#include + #include #include +#include + #include namespace NKikimr::NOlap::NDataLocks { @@ -11,6 +15,32 @@ class TManager { THashMap> ProcessLocks; std::shared_ptr StopFlag = std::make_shared(0); void UnregisterLock(const TString& processId); + +private: + template + std::optional IsLockedImpl(const TObject& portion, const ELockCategory lockCategory, const THashSet& excludedLocks) const { + const auto& isLocked = [&](const TString& name, const std::shared_ptr& lock) -> std::optional { + if (excludedLocks.contains(name)) { + return std::nullopt; + } + if (auto lockName = lock->IsLocked(portion, lockCategory, excludedLocks)) { + return lockName; + } + return std::nullopt; + }; + for (auto&& [name, lock] : ProcessLocks) { + if (auto locked = isLocked(name, lock)) { + return locked; + } + } + for (auto&& [name, lock] : NYDBTest::TControllers::GetColumnShardController()->GetExternalDataLocks()) { + if (auto locked = isLocked(name, lock)) { + return locked; + } + } + return {}; + } + public: TManager() = default; diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp index 476056001390..e5b7a1c86885 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp @@ -19,13 +19,13 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons } -bool TTieringProcessContext::AddPortion( +TTieringProcessContext::EAddPortionResult TTieringProcessContext::AddPortion( const std::shared_ptr& info, TPortionEvictionFeatures&& features, const std::optional dWait) { if (!UsedPortions.emplace(info->GetAddress()).second) { - return true; + return EAddPortionResult::PORTION_LOCKED; } if (DataLocksManager->IsLocked(*info, NDataLocks::ELockCategory::Actualization)) { - return true; + return EAddPortionResult::PORTION_LOCKED; } const auto buildNewTask = [&]() { @@ -40,7 +40,7 @@ bool TTieringProcessContext::AddPortion( if (Controller->IsNewTaskAvailable(it->first, it->second.size())) { it->second.emplace_back(buildNewTask()); } else { - return false; + return EAddPortionResult::TASK_LIMIT_EXCEEDED; } features.OnSkipPortionWithProcessMemory(Counters, *dWait); } @@ -49,7 +49,7 @@ bool TTieringProcessContext::AddPortion( if (Controller->IsNewTaskAvailable(it->first, it->second.size())) { it->second.emplace_back(buildNewTask()); } else { - return false; + return EAddPortionResult::TASK_LIMIT_EXCEEDED; } features.OnSkipPortionWithTxLimit(Counters, *dWait); } @@ -70,7 +70,7 @@ bool TTieringProcessContext::AddPortion( it->second.back().GetTask()->AddPortionToEvict(info, std::move(features)); AFL_VERIFY(!it->second.back().GetTask()->GetPortionsToRemove().HasPortions())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString()); } - return true; + return EAddPortionResult::SUCCESS; } } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h index 99dedc4c106a..d528de1c637f 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h @@ -41,6 +41,13 @@ class TTaskConstructor { }; class TTieringProcessContext { +public: + enum class EAddPortionResult { + SUCCESS = 0, + TASK_LIMIT_EXCEEDED, + PORTION_LOCKED, + }; + private: const TVersionedIndex& VersionedIndex; THashSet UsedPortions; @@ -79,7 +86,7 @@ class TTieringProcessContext { return result; } - bool AddPortion(const std::shared_ptr& info, TPortionEvictionFeatures&& features, const std::optional dWait); + EAddPortionResult AddPortion(const std::shared_ptr& info, TPortionEvictionFeatures&& features, const std::optional dWait); bool IsRWAddressAvailable(const TRWAddress& address) const { auto it = Tasks.find(address); diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp index d8e7044101c8..6961c9d65b71 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp @@ -85,11 +85,21 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); features.SetTargetTierName(portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); - if (!tasksContext.AddPortion(portion, std::move(features), {})) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("context", tasksContext.DebugString()); + bool limitExceeded = false; + switch (tasksContext.AddPortion(portion, std::move(features), {})) { + case TTieringProcessContext::EAddPortionResult::TASK_LIMIT_EXCEEDED: + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("reason", "limit_exceeded")( + "context", tasksContext.DebugString()); + limitExceeded = true; + break; + case TTieringProcessContext::EAddPortionResult::PORTION_LOCKED: + break; + case TTieringProcessContext::EAddPortionResult::SUCCESS: + portionsToRemove.emplace(portion->GetPortionId()); + break; + } + if (limitExceeded) { break; - } else { - portionsToRemove.emplace(portion->GetPortionId()); } } } diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp index 251b317247cb..f19d3ceacb09 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -185,11 +185,18 @@ void TTieringActualizer::DoExtractTasks( portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); features.SetTargetTierName(info->GetTargetTierName()); - if (!tasksContext.AddPortion(portion, std::move(features), info->GetLateness())) { - limitEnriched = true; + switch (tasksContext.AddPortion(portion, std::move(features), info->GetLateness())) { + case TTieringProcessContext::EAddPortionResult::TASK_LIMIT_EXCEEDED: + limitEnriched = true; + break; + case TTieringProcessContext::EAddPortionResult::PORTION_LOCKED: + break; + case TTieringProcessContext::EAddPortionResult::SUCCESS: + AFL_VERIFY(portionIds.emplace(portion->GetPortionId()).second); + break; + } + if (limitEnriched) { break; - } else { - portionIds.emplace(portion->GetPortionId()); } } if (limitEnriched) { diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 1d148d4cfd25..e4af5b8a3acd 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -29,6 +29,9 @@ class TDataAccessorsResult; namespace NIndexes { class TIndexMetaContainer; } +namespace NDataLocks { +class ILock; +} } // namespace NKikimr::NOlap namespace arrow { class RecordBatch; @@ -330,6 +333,9 @@ class ICSController { Y_UNUSED(txInfo); } + virtual THashMap> GetExternalDataLocks() const { + return {}; + } }; class TControllers { @@ -338,7 +344,7 @@ class TControllers { public: template - class TGuard: TNonCopyable { + class TGuard: TMoveOnly { private: std::shared_ptr Controller; @@ -348,12 +354,22 @@ class TControllers { Y_ABORT_UNLESS(Controller); } + TGuard(TGuard&& other) + : TGuard(other.Controller) { + other.Controller = nullptr; + } + TGuard& operator=(TGuard&& other) { + std::swap(Controller, other.Controller); + } + TController* operator->() { return Controller.get(); } ~TGuard() { - Singleton()->CSController = std::make_shared(); + if (Controller) { + Singleton()->CSController = std::make_shared(); + } } }; diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index d8ee7ae56a50..072faeee6f2a 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -44,6 +44,8 @@ class TController: public TReadOnlyController { TMutex ActiveTabletsMutex; std::set ActiveTablets; + THashMap> ExternalLocks; + class TBlobInfo { private: const NOlap::TUnifiedBlobId BlobId; @@ -218,6 +220,11 @@ class TController: public TReadOnlyController { SharingIds.emplace(sessionId); } + virtual THashMap> GetExternalDataLocks() const override { + TGuard g(Mutex); + return ExternalLocks; + } + public: virtual bool CheckPortionsToMergeOnCompaction(const ui64 /*memoryAfterAdd*/, const ui32 currentSubsetsCount) override { return currentSubsetsCount > 1; @@ -266,6 +273,16 @@ class TController: public TReadOnlyController { CompactionControl = value; } + void RegisterLock(const TString& name, const std::shared_ptr& lock) { + TGuard g(Mutex); + AFL_VERIFY(ExternalLocks.emplace(name, lock).second)("name", name); + } + + void UnregisterLock(const TString& name) { + TGuard g(Mutex); + AFL_VERIFY(ExternalLocks.erase(name))("name", name); + } + bool HasPKSortingOnly() const; void OnSwitchToWork(const ui64 tabletId) override { @@ -292,8 +309,8 @@ class TController: public TReadOnlyController { RestartOnLocalDbTxCommitted = std::move(txInfo); } - virtual void OnAfterLocalTxCommitted(const NActors::TActorContext& ctx, const ::NKikimr::NColumnShard::TColumnShard& shard, const TString& txInfo) override; - + virtual void OnAfterLocalTxCommitted( + const NActors::TActorContext& ctx, const ::NKikimr::NColumnShard::TColumnShard& shard, const TString& txInfo) override; }; } diff --git a/ydb/core/tx/columnshard/hooks/testing/ro_controller.h b/ydb/core/tx/columnshard/hooks/testing/ro_controller.h index c55be9455204..e0944ca1a55a 100644 --- a/ydb/core/tx/columnshard/hooks/testing/ro_controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/ro_controller.h @@ -115,6 +115,19 @@ class TReadOnlyController: public ICSController { } } + void WaitTtl(const TDuration d) const { + TInstant start = TInstant::Now(); + ui32 countStart = GetTTLStartedCounter().Val(); + while (Now() - start < d) { + if (countStart != GetTTLStartedCounter().Val()) { + countStart = GetTTLStartedCounter().Val(); + start = TInstant::Now(); + } + Cerr << "WAIT_TTL: " << GetTTLStartedCounter().Val() << Endl; + Sleep(TDuration::Seconds(1)); + } + } + template void WaitCondition(const TDuration d, const TTester& test) const { const TInstant start = TInstant::Now();