Skip to content

Commit

Permalink
allow missing tiers in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 committed Jan 23, 2025
1 parent ce2c5e5 commit 4e64227
Show file tree
Hide file tree
Showing 20 changed files with 276 additions and 85 deletions.
14 changes: 14 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
out << value.GetUint32();
break;
}
case NYdb::EPrimitiveType::Int32:
{
out << value.GetInt32();
break;
}
case NYdb::EPrimitiveType::Uint64:
{
out << value.GetUint64();
Expand Down Expand Up @@ -72,6 +77,15 @@ ui64 GetUint32(const NYdb::TValue& v) {
}
}

i64 GetInt32(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
return *value.GetOptionalInt32();
} else {
return value.GetInt32();
}
}

ui64 GetUint64(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields)
void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows);

ui64 GetUint32(const NYdb::TValue& v);
i64 GetInt32(const NYdb::TValue& v);
ui64 GetUint64(const NYdb::TValue& v);
TString GetUtf8(const NYdb::TValue& v);
TInstant GetTimestamp(const NYdb::TValue& v);
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,82 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}

Y_UNIT_TEST(DeletedTier) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSkipSpecialCheckForEvict(true);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);
TLocalHelper localHelper(testHelper.GetKikimr());
testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG);
NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient();
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");

localHelper.CreateTestOlapTable();
testHelper.CreateTier("tier1");

for (ui64 i = 0; i < 100; ++i) {
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));

csController->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
testHelper.ResetTiering("/Root/olapStore/olapTable");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(
SELECT
TierName, SUM(ColumnRawBytes) As RawBytes
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
WHERE Activity == 1
GROUP BY TierName
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
}

ui64 maxLevelValue;
{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
maxLevelValue = GetInt32(rows[0].at("level"));
}

{
auto result = testHelper.GetSession().ExecuteSchemeQuery(R"(DROP EXTERNAL DATA SOURCE `/Root/tier1`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) FROM `/Root/olapStore/olapTable`)");
auto it = tableClient.StreamExecuteScanQuery(selectQuery, NYdb::NTable::TStreamExecScanQuerySettings()).GetValueSync();
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT(!streamPart.IsSuccess());
UNIT_ASSERT_STRING_CONTAINS(streamPart.GetIssues().ToString(), "cannot read blob range");
}

testHelper.CreateTier("tier1");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), maxLevelValue);
}
}
}

} // namespace NKikimr::NKqp
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "storage.h"
#include "adapter.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"
#include "gc_actor.h"
#include "read.h"
#include "remove.h"
#include "storage.h"
#include "write.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/wrappers/unavailable_storage.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

Expand Down Expand Up @@ -57,7 +59,9 @@ void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* ti
if (!tierManager || !tierManager->IsReady()) {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
ExternalStorageOperator = std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId()));
return;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
}

void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
if (!Tiers->AreConfigsComplete()) {
if (Tiers->GetAwaitedConfigsCount()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
return;
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ void TTxInit::Complete(const TActorContext& ctx) {
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;

for (const auto& [pathId, tiering] : Self->TablesManager.GetTtl()) {
Self->Tiers->EnablePathId(pathId, tiering.GetUsedTiers());
for (const auto& [pathId, _] : Self->TablesManager.GetTables()) {
if (const auto tiers = Self->TablesManager.GetUsedTiers(pathId); !tiers.empty()) {
Self->Tiers->EnablePathId(pathId, tiers);
}
}

Self->TrySwitchToWork(ctx);
Expand Down
19 changes: 4 additions & 15 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,25 +409,19 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
}

{
THashSet<NTiers::TExternalStorageId> usedTiers;
TTableInfo table(pathId);
if (tableProto.HasTtlSettings()) {
const auto& ttlSettings = tableProto.GetTtlSettings();
*tableVerProto.MutableTtlSettings() = ttlSettings;
if (ttlSettings.HasEnabled()) {
usedTiers = NOlap::TTiering::GetUsedTiers(ttlSettings.GetEnabled());
}
}
TablesManager.RegisterTable(std::move(table), db);
if (!usedTiers.empty()) {
ActivateTiering(pathId, usedTiers);
}
}

tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());

TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db);
InsertTable->RegisterPathInfo(pathId);
ActivateTiering(pathId);

Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
Expand Down Expand Up @@ -455,19 +449,14 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
schema = alterProto.GetSchema();
}

THashSet<NTiers::TExternalStorageId> usedTiers;
if (alterProto.HasTtlSettings()) {
const auto& ttlSettings = alterProto.GetTtlSettings();
*tableVerProto.MutableTtlSettings() = ttlSettings;

if (ttlSettings.HasEnabled()) {
usedTiers = NOlap::TTiering::GetUsedTiers(ttlSettings.GetEnabled());
}
}
ActivateTiering(pathId, usedTiers);

tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db);
ActivateTiering(pathId);
}

void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version,
Expand Down Expand Up @@ -1592,9 +1581,9 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:
Execute(new TTxRemoveSharedBlobs(this, blobs, NActors::ActorIdFromProto(ev->Get()->Record.GetSourceActorId()), ev->Get()->Record.GetStorageId()), ctx);
}

void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers) {
void TColumnShard::ActivateTiering(const ui64 pathId) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
if (const auto usedTiers = TablesManager.GetUsedTiers(pathId); !usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tier_count", usedTiers.size());
Tiers->EnablePathId(pathId, usedTiers);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
putStatus.OnYellowChannels(Executor());
}

void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers);
void ActivateTiering(const ui64 pathId);
void OnTieringModified(const std::optional<ui64> pathId = {});

std::shared_ptr<TAtomicCounter> TabletActivityImpl = std::make_shared<TAtomicCounter>(0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class IColumnEngine {
}
virtual void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) = 0;
virtual void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) = 0;
virtual THashSet<NColumnShard::NTiers::TExternalStorageId> GetPopulatedTiers(const ui64 pathId) const = 0;
};

} // namespace NKikimr::NOlap
15 changes: 15 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,21 @@ void TColumnEngineForLogs::OnTieringModified(const THashMap<ui64, NOlap::TTierin
}
}

THashSet<NColumnShard::NTiers::TExternalStorageId> TColumnEngineForLogs::GetPopulatedTiers(const ui64 pathId) const {
THashSet<NColumnShard::NTiers::TExternalStorageId> tiers;
auto* findTable = GranulesStorage->GetTables().FindPtr(pathId);
if (!findTable) {
return tiers;
}
for (const auto& [id, portion] : (*findTable)->GetPortions()) {
const TString& tier = portion->GetTierNameDef(NBlobOperations::TGlobal::DefaultStorageId);
if (tier != NBlobOperations::TGlobal::DefaultStorageId) {
tiers.emplace(tier);
}
}
return tiers;
}

void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
std::shared_ptr<TGranuleMeta> g = GranulesStorage->RegisterTable(pathId, SignalCounters.RegisterGranuleDataCounters(), VersionedIndex);
if (ActualizationStarted) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class TColumnEngineForLogs: public IColumnEngine {
void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override;

virtual THashSet<NColumnShard::NTiers::TExternalStorageId> GetPopulatedTiers(const ui64 pathId) const override;

virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const override {
return std::make_shared<TVersionedIndex>(VersionedIndex);
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,19 @@ class TTablesManager {
return result;
}

THashSet<NTiers::TExternalStorageId> GetUsedTiers(const ui64 pathId) const {
if (!PrimaryIndex) {
return {};
}
auto tiers = PrimaryIndex->GetPopulatedTiers(pathId);
if (const auto* ttl = Ttl.FindPtr(pathId)) {
const THashSet<NTiers::TExternalStorageId>& ttlTiers = ttl->GetUsedTiers();
tiers.insert(ttlTiers.begin(), ttlTiers.end());
}
AFL_CRIT(NKikimrServices::TX_TIERING)("aboba", "get_used_tiers")("tiers", tiers.size());
return tiers;
}

bool InitFromDB(NIceDb::TNiceDb& db);

const TTableInfo& GetTable(const ui64 pathId) const;
Expand Down
36 changes: 22 additions & 14 deletions ydb/core/tx/tiering/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,33 @@ class TTiersManager::TActor: public TActorBootstrapped<TTiersManager::TActor> {
NTiers::TTierConfig tier;
if (const auto status = tier.DeserializeFromProto(description.GetExternalDataSourceDescription()); status.IsFail()) {
AFL_WARN(NKikimrServices::TX_TIERING)("event", "fetched_invalid_tier_settings")("error", status.GetErrorMessage());
OnTierFetchingError(tierId);
Owner->UpdateTierConfig(std::nullopt, tierId);
return;
}
Owner->UpdateTierConfig(tier, tierId);
} else {
AFL_WARN(false)("error", "invalid_object_type")("type", static_cast<ui64>(description.GetSelf().GetPathType()))("path", tierId.GetConfigPath());
OnTierFetchingError(tierId);
Owner->UpdateTierConfig(std::nullopt, tierId);
}
}

void Handle(NTiers::TEvNotifySchemeObjectDeleted::TPtr& ev) {
AFL_DEBUG(NKikimrServices::TX_TIERING)("component", "tiering_manager")("event", "object_deleted")("name", ev->Get()->GetObjectPath());
OnTierFetchingError(ev->Get()->GetObjectPath());
Owner->UpdateTierConfig(std::nullopt, ev->Get()->GetObjectPath());
}

void Handle(NTiers::TEvSchemeObjectResolutionFailed::TPtr& ev) {
const TString objectPath = ev->Get()->GetObjectPath();
AFL_WARN(NKikimrServices::TX_TIERING)("event", "object_resolution_failed")("path", objectPath)(
"reason", static_cast<ui64>(ev->Get()->GetReason()));
OnTierFetchingError(objectPath);
switch (ev->Get()->GetReason()) {
case NTiers::TEvSchemeObjectResolutionFailed::NOT_FOUND:
Owner->UpdateTierConfig(std::nullopt, objectPath);
break;
case NTiers::TEvSchemeObjectResolutionFailed::LOOKUP_ERROR:
OnTierFetchingError(objectPath);
break;
}
}

void Handle(NTiers::TEvWatchSchemeObject::TPtr& ev) {
Expand Down Expand Up @@ -282,6 +289,7 @@ void TTiersManager::EnablePathId(const ui64 pathId, const THashSet<NTiers::TExte
const auto& actorContext = NActors::TActivationContext::AsActorContext();
AFL_VERIFY(&actorContext)("error", "no_actor_context");
actorContext.Send(Actor->SelfId(), new NTiers::TEvWatchSchemeObject({ tierId.GetConfigPath() }));
AwaitedConfigs.emplace(tierId);
}
}
OnConfigsUpdated(false);
Expand All @@ -299,19 +307,19 @@ void TTiersManager::UpdateSecretsSnapshot(std::shared_ptr<NMetadata::NSecret::TS
OnConfigsUpdated();
}

void TTiersManager::UpdateTierConfig(const NTiers::TTierConfig& config, const NTiers::TExternalStorageId& tierId, const bool notifyShard) {
AFL_INFO(NKikimrServices::TX_TIERING)("event", "update_tier_config")("name", tierId.ToString())("tablet", TabletId);
TierConfigs[tierId] = config;
void TTiersManager::UpdateTierConfig(std::optional<NTiers::TTierConfig> config, const NTiers::TExternalStorageId& tierId, const bool notifyShard) {
AFL_INFO(NKikimrServices::TX_TIERING)("event", "update_tier_config")("name", tierId.ToString())("tablet", TabletId)("has_config", !!config);
if (config) {
TierConfigs[tierId] = *config;
} else {
TierConfigs.erase(tierId);
}
AwaitedConfigs.erase(tierId);
OnConfigsUpdated(notifyShard);
}

bool TTiersManager::AreConfigsComplete() const {
for (const auto& [tier, cnt] : TierRefCount) {
if (!TierConfigs.contains(tier)) {
return false;
}
}
return true;
ui64 TTiersManager::GetAwaitedConfigsCount() const {
return AwaitedConfigs.size();
}

TActorId TTiersManager::GetActorId() const {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/tiering/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class TTiersManager: public ITiersManager {
using TTierById = THashMap<NTiers::TExternalStorageId, NTiers::TTierConfig>;
YDB_READONLY_DEF(TTierById, TierConfigs);
YDB_READONLY_DEF(std::shared_ptr<NMetadata::NSecret::TSnapshot>, Secrets);
YDB_READONLY_DEF(THashSet<NTiers::TExternalStorageId>, AwaitedConfigs);

private:
void OnConfigsUpdated(bool notifyShard = true);
Expand All @@ -109,8 +110,8 @@ class TTiersManager: public ITiersManager {
void DisablePathId(const ui64 pathId);

void UpdateSecretsSnapshot(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
void UpdateTierConfig(const NTiers::TTierConfig& config, const NTiers::TExternalStorageId& tierId, const bool notifyShard = true);
bool AreConfigsComplete() const;
void UpdateTierConfig(std::optional<NTiers::TTierConfig> config, const NTiers::TExternalStorageId& tierId, const bool notifyShard = true);
ui64 GetAwaitedConfigsCount() const;

TString DebugString();

Expand Down
Loading

0 comments on commit 4e64227

Please sign in to comment.