Skip to content

Commit

Permalink
improve actualization (ydb-platform#3106)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Mar 24, 2024
1 parent 088ee76 commit 41ac5b7
Show file tree
Hide file tree
Showing 25 changed files with 163 additions and 72 deletions.
27 changes: 10 additions & 17 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1277,12 +1277,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
Tests::NCommon::TLoggerInit(kikimr).SetComponents({NKikimrServices::TX_COLUMNSHARD}, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

std::vector<TString> uids;
std::vector<TString> resourceIds;
Expand Down Expand Up @@ -1341,15 +1342,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
const TInstant start = TInstant::Now();
AFL_VERIFY(!csController->GetActualizationsCount().Val());
while (TInstant::Now() - start < TDuration::Seconds(20) && !csController->GetActualizationsCount().Val()) {
Sleep(TDuration::Seconds(1));
Cerr << "waiting actualizations..." << Endl;
}
AFL_VERIFY(csController->GetActualizationsCount().Val());
}
csController->WaitActualization(TDuration::Seconds(10));
{
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
Expand All @@ -1362,11 +1355,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
Cerr << result << Endl;
Cerr << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
CompareYson(result, R"([[0u;]])");
AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val() * 0.3);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val() * 0.4)
("approve", csController->GetIndexesApprovedOnSelect().Val())("skip", csController->GetIndexesSkippingOnSelect().Val());
}
}

Expand All @@ -1376,12 +1370,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

// Tests::NCommon::TLoggerInit(kikimr).Initialize();

std::vector<TString> uids;
std::vector<TString> resourceIds;
std::vector<ui32> levels;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/bs/gc_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ void TGarbageCollectionActor::Handle(TEvBlobStorage::TEvCollectGarbageResult::TP
GCTask->OnGCResult(ev);
CheckFinished();
} else {
ACFL_ERROR()("event", "GC_ERROR")("details", ev->Get()->Print(true));
SendToBSProxy(NActors::TActivationContext::AsActorContext(), ev->Cookie, GCTask->BuildRequest(ev->Cookie).release(), ev->Cookie);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context);
}

if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) {
ctx.Schedule(Self->FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
Self->EnqueueBackgroundActivities(false, TriggerActivity);
}

Self->EnqueueBackgroundActivities(false, TriggerActivity);
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self, Ev->Get()->GetPutStatus() == NKikimrProto::OK);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(*changes, *Self);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ void TColumnShard::CleanupActors(const TActorContext& ctx) {

StoragesManager->Stop();
ExportsManager->Stop();
DataLocksManager->Stop();
if (Tiers) {
Tiers->Stop(true);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
}

void TColumnShard::OnTieringModified(const std::optional<ui64> pathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")("path_id", pathId);
if (Tiers->IsReady()) {
StoragesManager->OnTieringModified(Tiers);
if (TablesManager.HasPrimaryIndex()) {
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/tx/columnshard/data_locks/manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

namespace NKikimr::NOlap::NDataLocks {

void TManager::RegisterLock(const std::shared_ptr<ILock>& lock) {
std::shared_ptr<TManager::TGuard> TManager::RegisterLock(const std::shared_ptr<ILock>& lock) {
AFL_VERIFY(lock);
AFL_VERIFY(ProcessLocks.emplace(lock->GetLockName(), lock).second)("process_id", lock->GetLockName());
return std::make_shared<TGuard>(lock->GetLockName(), StopFlag);
}

void TManager::UnregisterLock(const TString& processId) {
Expand All @@ -30,4 +31,18 @@ std::optional<TString> TManager::IsLocked(const TGranuleMeta& granule) const {
return {};
}

void TManager::Stop() {
AFL_VERIFY(StopFlag->Inc() == 1);
}

TManager::TGuard::~TGuard() {
AFL_VERIFY(Released || !NActors::TlsActivationContext || StopFlag->Val() == 1);
}

void TManager::TGuard::Release(TManager& manager) {
AFL_VERIFY(!Released);
manager.UnregisterLock(ProcessId);
Released = true;
}

}
28 changes: 24 additions & 4 deletions ydb/core/tx/columnshard/data_locks/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,35 @@ namespace NKikimr::NOlap::NDataLocks {
class TManager {
private:
THashMap<TString, std::shared_ptr<ILock>> ProcessLocks;
std::shared_ptr<TAtomicCounter> StopFlag = std::make_shared<TAtomicCounter>(0);
void UnregisterLock(const TString& processId);
public:
TManager() = default;

void RegisterLock(const std::shared_ptr<ILock>& lock);
void Stop();

class TGuard {
private:
const TString ProcessId;
std::shared_ptr<TAtomicCounter> StopFlag;
bool Released = false;
public:
TGuard(const TString& processId, const std::shared_ptr<TAtomicCounter>& stopFlag)
: ProcessId(processId)
, StopFlag(stopFlag)
{

}
~TGuard();

void Release(TManager& manager);
};

[[nodiscard]] std::shared_ptr<TGuard> RegisterLock(const std::shared_ptr<ILock>& lock);
template <class TLock, class ...Args>
void RegisterLock(Args&&... args) {
RegisterLock(std::make_shared<TLock>(args...));
[[nodiscard]] std::shared_ptr<TGuard> RegisterLock(Args&&... args) {
return RegisterLock(std::make_shared<TLock>(args...));
}
void UnregisterLock(const TString& processId);
std::optional<TString> IsLocked(const TPortionInfo& portion) const;
std::optional<TString> IsLocked(const TGranuleMeta& granule) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ bool TCommonSession::Start(const NColumnShard::TColumnShard& shard) {
IsStartedFlag = false;
}
if (IsStartedFlag) {
shard.GetDataLocksManager()->RegisterLock<NDataLocks::TListPortionsLock>("sharing_session:" + GetSessionId(), portionsLock, true);
AFL_VERIFY(!LockGuard);
LockGuard = shard.GetDataLocksManager()->RegisterLock<NDataLocks::TListPortionsLock>("sharing_session:" + GetSessionId(), portionsLock, true);
}
IsStartingFlag = false;
return IsStartedFlag;
Expand All @@ -57,7 +58,8 @@ void TCommonSession::Finish(const std::shared_ptr<NDataLocks::TManager>& dataLoc
AFL_VERIFY(!IsFinishedFlag);
IsFinishedFlag = true;
if (IsStartedFlag) {
dataLocksManager->UnregisterLock("sharing_session:" + GetSessionId());
AFL_VERIFY(LockGuard);
LockGuard->Release(*dataLocksManager);
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/data_sharing/common/session/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/data_sharing/common/context/context.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>

#include <ydb/library/accessor/accessor.h>
Expand Down Expand Up @@ -29,6 +30,7 @@ class TCommonSession {
YDB_READONLY_DEF(TString, SessionId);
const TString Info;
YDB_READONLY(ui64, RuntimeId, GetNextRuntimeId());
std::shared_ptr<NDataLocks::TManager::TGuard> LockGuard;
bool IsStartedFlag = false;
bool IsStartingFlag = false;
bool IsFinishedFlag = false;
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinis
}

void TColumnEngineChanges::Start(NColumnShard::TColumnShard& self) {
self.DataLocksManager->RegisterLock(BuildDataLock());
AFL_VERIFY(!LockGuard);
LockGuard = self.DataLocksManager->RegisterLock(BuildDataLock());
Y_ABORT_UNLESS(Stage == EStage::Created);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(self.TabletID(), TypeString());
DoStart(self);
Expand All @@ -99,7 +100,9 @@ void TColumnEngineChanges::AbortEmergency() {
}

void TColumnEngineChanges::OnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
self.DataLocksManager->UnregisterLock(TypeString() + "::" + GetTaskIdentifier());
if (!!LockGuard) {
LockGuard->Release(*self.DataLocksManager);
}
DoOnFinish(self, context);
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/data_locks/locks/abstract.h>
#include <ydb/core/tx/columnshard/data_locks/locks/composite.h>
#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
Expand Down Expand Up @@ -197,6 +198,7 @@ class TColumnEngineChanges {
};
private:
EStage Stage = EStage::Created;
std::shared_ptr<NDataLocks::TManager::TGuard> LockGuard;
protected:
virtual void DoDebugString(TStringOutput& out) const = 0;
virtual void DoCompile(TFinalizationContext& context) = 0;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/ttl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
auto& engine = self.MutableIndexAs<TColumnEngineForLogs>();
engine.GetActualizationController()->FinishActualization(RWAddress);
if (IsAborted()) {
THashMap<ui64, THashSet<ui64>> restoreIndexAddresses;
for (auto&& i : PortionsToEvict) {
AFL_VERIFY(restoreIndexAddresses[i.GetPortionInfo().GetPathId()].emplace(i.GetPortionInfo().GetPortionId()).second);
}
for (auto&& i : PortionsToRemove) {
AFL_VERIFY(restoreIndexAddresses[i.first.GetPathId()].emplace(i.first.GetPortionId()).second);
}
engine.ReturnToIndexes(restoreIndexAddresses);
}
}

std::optional<TPortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
const auto pred = [](const TPortionForEviction& p) {
return p.GetPortionInfo().GetAddress();
};
return std::make_shared<NDataLocks::TListPortionsLock>(TypeString() + "::" + GetTaskIdentifier(), PortionsToEvict, pred);
return std::make_shared<NDataLocks::TListPortionsLock>(TypeString() + "::" + RWAddress.DebugString() + "::" + GetTaskIdentifier(), PortionsToEvict, pred);
}
public:
class TMemoryPredictorSimplePolicy: public IMemoryPredictor {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ class TColumnEngineForLogs : public IColumnEngine {
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
const std::shared_ptr<NDataLocks::TManager>& locksManager, const ui64 memoryUsageLimit) noexcept override;

void ReturnToIndexes(const THashMap<ui64, THashSet<ui64>>& portions) const {
for (auto&& [g, portionIds] : portions) {
auto it = Tables.find(g);
AFL_VERIFY(it != Tables.end());
it->second->ReturnToIndexes(portionIds);
}
}
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) noexcept override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ namespace NKikimr::NOlap::NActualizer {
class IActualizer {
protected:
virtual void DoAddPortion(const TPortionInfo& info, const TAddExternalContext& context) = 0;
virtual void DoRemovePortion(const TPortionInfo& info) = 0;
virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const = 0;
virtual void DoRemovePortion(const ui64 portionId) = 0;
virtual void DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) = 0;
public:
virtual ~IActualizer() = default;
void BuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) {
return DoBuildTasks(tasksContext, externalContext, internalContext);
void ExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) {
return DoExtractTasks(tasksContext, externalContext, internalContext);
}
void AddPortion(const std::shared_ptr<TPortionInfo>& info, const TAddExternalContext& context) {
AFL_VERIFY(info);
Expand All @@ -22,12 +22,8 @@ class IActualizer {
}
return DoAddPortion(*info, context);
}
void RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
AFL_VERIFY(info);
if (info->HasRemoveSnapshot()) {
return;
}
return DoRemovePortion(*info);
void RemovePortion(const ui64 portionId) {
return DoRemovePortion(portionId);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TTieringProcessContext;
class TAddExternalContext {
private:
YDB_READONLY_DEF(TInstant, Now);
YDB_ACCESSOR(bool, PortionExclusiveGuarantee, true);
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& Portions;
public:
TAddExternalContext(const TInstant now, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

namespace NKikimr::NOlap::NActualizer {

void TGranuleActualizationIndex::BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const {
void TGranuleActualizationIndex::ExtractActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const {
TInternalTasksContext internalContext;
for (auto&& i : Actualizers) {
i->BuildTasks(tasksContext, externalContext, internalContext);
i->ExtractTasks(tasksContext, externalContext, internalContext);
}
}

Expand All @@ -20,7 +20,7 @@ void TGranuleActualizationIndex::AddPortion(const std::shared_ptr<TPortionInfo>&

void TGranuleActualizationIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
for (auto&& i : Actualizers) {
i->RemovePortion(portion);
i->RemovePortion(portion->GetPortionId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TGranuleActualizationIndex {
void Start();
TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex);

void BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const;
void ExtractActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const;

void RefreshTiering(const std::optional<TTiering>& info, const TAddExternalContext& context);
void RefreshScheme(const TAddExternalContext& context);
Expand Down
Loading

0 comments on commit 41ac5b7

Please sign in to comment.