diff --git a/ydb/core/blobstorage/base/blobstorage_shred_events.h b/ydb/core/blobstorage/base/blobstorage_shred_events.h index 12750eb5a22d..fea1dc9ae706 100644 --- a/ydb/core/blobstorage/base/blobstorage_shred_events.h +++ b/ydb/core/blobstorage/base/blobstorage_shred_events.h @@ -19,6 +19,12 @@ namespace NKikimr { struct TEvBlobStorage::TEvControllerShredResponse : TEventPB { TEvControllerShredResponse() = default; + + TEvControllerShredResponse(ui64 generation, bool completed, ui32 progress) { + Record.SetCurrentGeneration(generation); + Record.SetCompleted(completed); + Record.SetProgress10k(progress); + } }; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c3c0c8ac302d..1d081ef9e8ed 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1967,6 +1967,7 @@ message TDataErasureConfig { // Every 10 m do request to BSC optional uint64 BlobStorageControllerRequestIntervalSeconds = 5 [default = 600]; optional TTenantDataErasureConfig TenantDataErasureConfig = 6; + optional bool ForceManualStartup = 7 [default = false]; } message TGraphConfig { diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 182e575c30f8..5de3fe6542f1 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -628,8 +628,11 @@ enum ETxTypes { TXTYPE_LIST_USERS = 90 [(TxTypeOpts) = {Name: "TxListUsers"}]; TXTYPE_UNMARK_RESTORE_TABLES = 91 [(TxTypeOpts) = {Name: "TxUnmarkRestoreTables"}]; - TXTYPE_RUN_TENANT_DATA_ERASURE = 92 [(TxTypeOpts) = {Name: "TxRunTenantDataErasure"}]; + TXTYPE_DATA_ERASURE_INIT = 92 [(TxTypeOpts) = {Name: "TxDataErasureInit"}]; TXTYPE_RUN_DATA_ERASURE = 93 [(TxTypeOpts) = {Name: "TxRunDataErasure"}]; + TXTYPE_RUN_DATA_ERASURE_TENANT = 94 [(TxTypeOpts) = {Name: "TxRunTenantDataErasure"}]; - TXTYPE_COMPLETE_DATA_ERASURE = 94 [(TxTypeOpts) = {Name: "TxCompleteDataErasure"}]; + TXTYPE_COMPLETE_DATA_ERASURE_SHARD = 95 [(TxTypeOpts) = {Name: "TxCompleteDataErasureShard"}]; + TXTYPE_COMPLETE_DATA_ERASURE_TENANT = 96 [(TxTypeOpts) = {Name: "TxCompleteDataErasureTenant"}]; + TXTYPE_COMPLETE_DATA_ERASURE_BSC = 97 [(TxTypeOpts) = {Name: "TxCompleteDataErasureBSC"}]; } diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index c7cad4462295..c8d9d2f8d16d 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -472,3 +472,6 @@ message TEvDataErasureInfoResponse { optional EStatus Status = 1; optional uint64 Generation = 2; } + +message TEvDataErasureManualStartupRequest { +} diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index 26994c216b19..2bd067c1f0c5 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -103,10 +103,10 @@ namespace TEvSchemeShard { EvWakeupToRunDataErasure, EvMeasureDataErasureBSC, EvWakeupToRunDataErasureBSC, - EvRunDataErasure, EvCompleteDataErasure, EvDataErasureInfoRequest, EvDataErasureInfoResponse, + EvDataErasureManualStartupRequest, EvEnd }; @@ -689,16 +689,6 @@ namespace TEvSchemeShard { TEvListUsersResult() = default; }; - struct TEvRunDataErasure : TEventLocal { - const ui64 Generation; - const TInstant StartTime; - - TEvRunDataErasure(ui64 generation, const TInstant& startTime) - : Generation(generation) - , StartTime(startTime) - {} - }; - struct TEvTenantDataErasureRequest : TEventPB { TEvTenantDataErasureRequest() = default; @@ -768,6 +758,8 @@ namespace TEvSchemeShard { } } }; + + struct TEvDataErasureManualStartupRequest : TEventPB {}; }; } diff --git a/ydb/core/tx/schemeshard/schemeshard__data_erasure.cpp b/ydb/core/tx/schemeshard/schemeshard__data_erasure.cpp deleted file mode 100644 index bfddd4360d6e..000000000000 --- a/ydb/core/tx/schemeshard/schemeshard__data_erasure.cpp +++ /dev/null @@ -1,320 +0,0 @@ -#include "schemeshard_impl.h" - -#include - -namespace NKikimr::NSchemeShard { - -NOperationQueue::EStartStatus TSchemeShard::StartDataErasure(const TPathId& pathId) { - UpdateDataErasureQueueMetrics(); - - auto ctx = ActorContext(); - - auto it = SubDomains.find(pathId); - if (it == SubDomains.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Start] Failed to resolve subdomain info " - "for pathId# " << pathId - << " at schemeshard# " << TabletID()); - - return NOperationQueue::EStartStatus::EOperationRemove; - } - - const auto& tenantSchemeShardId = it->second->GetTenantSchemeShardID(); - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Start] Data erasure " - "for pathId# " << pathId - << ", tenant schemeshard# " << tenantSchemeShardId - << ", next wakeup# " << DataErasureQueue->GetWakeupDelta() - << ", rate# " << DataErasureQueue->GetRate() - << ", in queue# " << DataErasureQueue->Size() << " tenants" - << ", running# " << DataErasureQueue->RunningSize() << " tenants" - << " at schemeshard " << TabletID()); - - std::unique_ptr request( - new TEvSchemeShard::TEvTenantDataErasureRequest(DataErasureGeneration)); - - RunningDataErasureTenants[pathId] = PipeClientCache->Send( - ctx, - ui64(tenantSchemeShardId), - request.release()); - - return NOperationQueue::EStartStatus::EOperationRunning; -} - -void TSchemeShard::OnDataErasureTimeout(const TPathId& pathId) { - UpdateDataErasureQueueMetrics(); - TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_TIMEOUT].Increment(1); - - RunningDataErasureTenants.erase(pathId); - - auto ctx = ActorContext(); - - if (!SubDomains.contains(pathId)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Timeout] Failed to resolve subdomain info " - "for path# " << pathId - << " at schemeshard# " << TabletID()); - return; - } - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Timeout] Data erasure timeouted " - "for pathId# " << pathId - << ", next wakeup in# " << DataErasureQueue->GetWakeupDelta() - << ", rate# " << DataErasureQueue->GetRate() - << ", in queue# " << DataErasureQueue->Size() << " tenants" - << ", running# " << DataErasureQueue->RunningSize() << " tenants" - << " at schemeshard " << TabletID()); - - // retry - EnqueueDataErasure(pathId); -} - -void TSchemeShard::EnqueueDataErasure(const TPathId& pathId) { - if (!DataErasureQueue) - return; - - auto ctx = ActorContext(); - - if (DataErasureQueue->Enqueue(pathId)) { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[DataErasure] [Enqueue] Enqueued pathId# " << pathId << " at schemeshard " << TabletID()); - UpdateDataErasureQueueMetrics(); - } else { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[DataErasure] [Enqueue] Skipped or already exists pathId# " << pathId << " at schemeshard " << TabletID()); - } -} - -void TSchemeShard::DataErasureHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { - const auto shardIdx = GetShardIdx(tabletId); - if (!ShardInfos.contains(shardIdx)) { - return; - } - - const auto& pathId = ShardInfos.at(shardIdx).PathId; - if (!TTLEnabledTables.contains(pathId)) { - return; - } - - const auto it = RunningDataErasureTenants.find(pathId); - if (it == RunningDataErasureTenants.end()) { - return; - } - - if (it->second != clientId) { - return; - } - - RunningDataErasureTenants.erase(pathId); - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Disconnect] Data erasure disconnect " - "to tablet: " << tabletId - << ", at schemeshard: " << TabletID()); - - StartDataErasure(pathId); -} - -void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - if (record.GetGeneration() != DataErasureGeneration) { - return; - } - - Execute(CreateTxCompleteDataErasure(ev), ctx); - - auto pathId = TPathId( - record.GetPathId().GetOwnerId(), - record.GetPathId().GetLocalId()); - - auto duration = DataErasureQueue->OnDone(pathId); - - if (!SubDomains.contains(pathId)) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Finished] Failed to resolve subdomain info " - "for pathId# " << pathId - << " in# " << duration.MilliSeconds() << " ms" - << ", next wakeup in# " << DataErasureQueue->GetWakeupDelta() - << ", rate# " << DataErasureQueue->GetRate() - << ", in queue# " << DataErasureQueue->Size() << " tenants" - << ", running# " << DataErasureQueue->RunningSize() << " tenants" - << " at schemeshard " << TabletID()); - } else { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[DataErasure] [Finished] Data erasure completed " - "for pathId# " << pathId - << " in# " << duration.MilliSeconds() << " ms" - << ", next wakeup# " << DataErasureQueue->GetWakeupDelta() - << ", rate# " << DataErasureQueue->GetRate() - << ", in queue# " << DataErasureQueue->Size() << " tenants" - << ", running# " << DataErasureQueue->RunningSize() << " tenants" - << " at schemeshard " << TabletID()); - } - - ActiveDataErasureTenants.erase(pathId); - RunningDataErasureTenants.erase(pathId); - - TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_OK].Increment(1); - UpdateDataErasureQueueMetrics(); - - bool isDataErasureCompleted = true; - for (const auto& [pathId, status] : ActiveDataErasureTenants) { - if (status == EDataErasureStatus::IN_PROGRESS) { - isDataErasureCompleted = false; - break; - } - } - - if (isDataErasureCompleted) { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Data erasure in tenants is completed. Send request to BS controller"); - DataErasureScheduler->SetStatus(TDataErasureScheduler::EStatus::IN_PROGRESS_BSC); - std::unique_ptr request( - new TEvBlobStorage::TEvControllerShredRequest(DataErasureGeneration)); - - PipeClientCache->Send(ctx, MakeBSControllerID(), request.release()); - } -} - -void TSchemeShard::Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - if (record.GetCurrentGeneration() != DataErasureGeneration) { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvControllerShredResponse: Get unexpected generation " << record.GetCurrentGeneration()); - return; - } - - if (record.GetCompleted()) { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvControllerShredResponse: Data shred in BSC is completed"); - DataErasureScheduler->CompleteDataErasure(ctx); - } else { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvControllerShredResponse: Progress data shred in BSC " << record.GetProgress10k()); - ctx.Schedule(DataErasureScheduler->GetDataErasureBSCInterval(), new TEvSchemeShard::TEvWakeupToRunDataErasure); - } -} - -void TSchemeShard::Handle(TEvSchemeShard::TEvWakeupToRunDataErasureBSC::TPtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ev); - std::unique_ptr request( - new TEvBlobStorage::TEvControllerShredRequest(DataErasureGeneration)); - - PipeClientCache->Send(ctx, MakeBSControllerID(), request.release()); -} - -void TSchemeShard::UpdateDataErasureQueueMetrics() { - if (!DataErasureQueue) { - return; - } - - TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_SIZE].Set(DataErasureQueue->Size()); - TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_RUNNING].Set(DataErasureQueue->RunningSize()); -} - -struct TSchemeShard::TTxRunDataErasure : public TSchemeShard::TRwTxBase { - ui64 RequestedGeneration; - TInstant StartTime; - - TTxRunDataErasure(TSelf *self, ui64 generation, const TInstant& startTime) - : TRwTxBase(self) - , RequestedGeneration(generation) - , StartTime(startTime) - {} - - TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunDataErasure Execute at schemeshard: " << Self->TabletID()); - NIceDb::TNiceDb db(txc.DB); - if (Self->DataErasureGeneration < RequestedGeneration) { - Self->DataErasureGeneration = RequestedGeneration; - Self->DataErasureQueue->Clear(); - Self->ActiveDataErasureTenants.clear(); - Self->RunningDataErasureTenants.clear(); - for (auto& [pathId, subdomain] : Self->SubDomains) { - auto path = TPath::Init(pathId, Self); - if (path->IsRoot()) { - continue; - } - if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard - continue; - } - Self->DataErasureQueue->Enqueue(pathId); - Self->ActiveDataErasureTenants[pathId] = EDataErasureStatus::IN_PROGRESS; - db.Table().Key(Self->DataErasureGeneration).Update(static_cast(Self->DataErasureScheduler->GetStatus()), - StartTime.MicroSeconds()); - db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update(static_cast(Self->ActiveDataErasureTenants[pathId])); - } - } else if (Self->DataErasureGeneration == RequestedGeneration) { - Self->DataErasureQueue->Clear(); - Self->RunningDataErasureTenants.clear(); - for (const auto& [pathId, status] : Self->ActiveDataErasureTenants) { - if (status == EDataErasureStatus::IN_PROGRESS) { - Self->DataErasureQueue->Enqueue(pathId); - } - } - } - } - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunDataErasure Complete at schemeshard: " << Self->TabletID()); - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunDataErasure(ui64 generation, const TInstant& startTime) { - return new TTxRunDataErasure(this, generation, startTime); -} - -struct TSchemeShard::TTxCompleteDataErasure : public TSchemeShard::TRwTxBase { - const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr Ev; - - TTxCompleteDataErasure(TSelf* self, const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) - : TRwTxBase(self) - , Ev(std::move(ev)) - {} - - TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasure Execute at schemeshard: " << Self->TabletID()); - - NIceDb::TNiceDb db(txc.DB); - const auto& record = Ev->Get()->Record; - auto pathId = TPathId( - record.GetPathId().GetOwnerId(), - record.GetPathId().GetLocalId()); - Self->ActiveDataErasureTenants[pathId] = EDataErasureStatus::COMPLETED; - db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update(static_cast(Self->ActiveDataErasureTenants[pathId])); - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxCompleteDataErasure Complete at schemeshard: " << Self->TabletID()); - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasure(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) { - return new TTxCompleteDataErasure(this, ev); -} - -struct TSchemeShard::TTxDataErasureSchedulerInit : public TSchemeShard::TRwTxBase { - TTxDataErasureSchedulerInit(TSelf* self) - : TRwTxBase(self) - {} - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxDataErasureSchedulerInit Execute at schemeshard: " << Self->TabletID()); - NIceDb::TNiceDb db(txc.DB); - db.Table().Key(0).Update(static_cast(TDataErasureScheduler::EStatus::COMPLETED), AppData(ctx)->TimeProvider->Now().MicroSeconds()); - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxDataErasureSchedulerInit Complete at schemeshard: " << Self->TabletID()); - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxDataErasureSchedulerInit() { - return new TTxDataErasureSchedulerInit(this); -} - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp new file mode 100644 index 000000000000..7ee61d69a681 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.cpp @@ -0,0 +1,34 @@ +#include "schemeshard__data_erasure_manager.h" + +namespace NKikimr::NSchemeShard { + +TDataErasureManager::TDataErasureManager(TSchemeShard* const schemeShard) + : SchemeShard(schemeShard) +{} + +TDataErasureManager::EStatus TDataErasureManager::GetStatus() const { + return Status; +} + +void TDataErasureManager::SetStatus(const EStatus& status) { + Status = status; +} + +void TDataErasureManager::IncGeneration() { + ++Generation; +} + +void TDataErasureManager::SetGeneration(ui64 generation) { + Generation = generation; +} + +ui64 TDataErasureManager::GetGeneration() const { + return Generation; +} + +void TDataErasureManager::Clear() { + ClearOperationQueue(); + ClearWaitingDataErasureRequests(); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h b/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h new file mode 100644 index 000000000000..8fe7a5a3ee7d --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__data_erasure_manager.h @@ -0,0 +1,199 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +namespace NKikimrConfig { + +class TDataErasureConfig; + +} // NKikimrConfig + +namespace NKikimr::NSchemeShard { + +class TSchemeShard; + +class TDataErasureManager { +public: + enum class EStatus : ui32 { + UNSPECIFIED, + COMPLETED, + IN_PROGRESS, + IN_PROGRESS_BSC, + }; + +protected: + TSchemeShard* const SchemeShard; + EStatus Status = EStatus::UNSPECIFIED; + ui64 Generation = 0; + +public: + TDataErasureManager(TSchemeShard* const schemeShard); + + virtual ~TDataErasureManager() = default; + + virtual void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) = 0; + virtual void Start() = 0; + virtual void Stop() = 0; + virtual void ClearOperationQueue() = 0; + virtual void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) = 0; + virtual void ClearWaitingDataErasureRequests() = 0; + virtual void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) = 0; + virtual void Run(NIceDb::TNiceDb& db) = 0; + virtual void Continue() = 0; + virtual void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) = 0; + virtual void OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) = 0; + virtual void OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) = 0; + virtual void ScheduleRequestToBSC() = 0; + virtual void SendRequestToBSC() = 0; + virtual void Complete() = 0; + virtual bool Restore(NIceDb::TNiceDb& db) = 0; + virtual void Remove(const TPathId& pathId) = 0; + + void Clear(); + + EStatus GetStatus() const; + void SetStatus(const EStatus& status); + + void IncGeneration(); + void SetGeneration(ui64 generation); + ui64 GetGeneration() const; +}; + +//////////////////// TRootDataErasureManager //////////////////// + +class TRootDataErasureManager : public TDataErasureManager { +private: +using TQueue = NOperationQueue::TOperationQueueWithTimer< + TPathId, + TFifoQueue, + TEvPrivate::EvRunDataErasure, + NKikimrServices::FLAT_TX_SCHEMESHARD, + NKikimrServices::TActivity::DATA_ERASURE>; + + class TStarter : public TQueue::IStarter { + public: + TStarter(TRootDataErasureManager* const manager); + + NOperationQueue::EStartStatus StartOperation(const TPathId&) override; + void OnTimeout(const TPathId&) override; + + private: + TRootDataErasureManager* const Manager; + }; + +private: + TStarter Starter; + TQueue* Queue = nullptr; + THashMap WaitingDataErasureTenants; + THashMap ActivePipes; + + TDuration DataErasureInterval; + TDuration DataErasureBSCInterval; + TDuration CurrentWakeupInterval; + bool IsDataErasureWakeupScheduled = false; + bool IsRequestToBSCScheduled = false; + TInstant StartTime; + TInstant FinishTime; + + TTabletId BSC; + bool IsManualStartup = false; + +public: + TRootDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); + + void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) override; + void Start() override; + void Stop() override; + void ClearOperationQueue() override; + void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) override; + void ClearWaitingDataErasureRequests() override; + void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) override; + void Run(NIceDb::TNiceDb& db) override; + void Continue() override; + void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) override; + void OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) override; + void OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) override; + void ScheduleRequestToBSC() override; + void SendRequestToBSC() override; + void Complete() override; + bool Restore(NIceDb::TNiceDb& db) override; + void Remove(const TPathId& pathId) override; + +private: + static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config); + + void ScheduleDataErasureWakeup(); + NOperationQueue::EStartStatus StartDataErasure(const TPathId& pathId); + void OnTimeout(const TPathId& pathId); + void Enqueue(const TPathId& pathId); + void UpdateMetrics(); +}; + +//////////////////// TTenantDataErasureManager //////////////////// + +class TTenantDataErasureManager : public TDataErasureManager { +private: +using TQueue = NOperationQueue::TOperationQueueWithTimer< + TShardIdx, + TFifoQueue, + TEvPrivate::EvRunTenantDataErasure, + NKikimrServices::FLAT_TX_SCHEMESHARD, + NKikimrServices::TActivity::TENANT_DATA_ERASURE>; + + class TStarter : public TQueue::IStarter { + public: + TStarter(TTenantDataErasureManager* const manager); + + NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override; + void OnTimeout(const TShardIdx& shardIdx) override; + + private: + TTenantDataErasureManager* const Manager; + }; + +private: + TStarter Starter; + TQueue* Queue = nullptr; + THashMap WaitingDataErasureShards; + THashMap ActivePipes; + +public: + TTenantDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config); + + void UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) override; + void Start() override; + void Stop() override; + void ClearOperationQueue() override; + void ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) override; + void ClearWaitingDataErasureRequests() override; + void WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) override; + void Run(NIceDb::TNiceDb& db) override; + void Continue() override; + void HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) override; + void OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) override; + void OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) override; + void ScheduleRequestToBSC() override; + void SendRequestToBSC() override; + void Complete() override; + bool Restore(NIceDb::TNiceDb& db) override; + void Remove(const TPathId& pathId) override; + +private: + static TQueue::TConfig ConvertConfig(const NKikimrConfig::TDataErasureConfig& config); + + NOperationQueue::EStartStatus StartDataErasure(const TShardIdx& shardIdx); + void OnTimeout(const TShardIdx& shardIdx); + void Enqueue(const TShardIdx& shardIdx); + void UpdateMetrics(); + void SendResponseToRootSchemeShard(); +}; + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 9e8f47109db3..3b4b11d9f247 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -1868,119 +1868,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase { // Read Running data erasure for tenants { - if (Self->IsDomainSchemeShard) { - { - auto rowset = db.Table().Range().Select(); - if (!rowset.IsReady()) { - return false; - } - if (rowset.EndOfSet()) { - Self->DataErasureScheduler->Restore({.IsInitialized = false, .StartTime = AppData(ctx)->TimeProvider->Now()}, ctx); - } else { - ui64 currentGeneration = 0; - TInstant startTime; - TDataErasureScheduler::EStatus status = TDataErasureScheduler::EStatus::COMPLETED; - while (!rowset.EndOfSet()) { - ui64 generation = rowset.GetValue(); - if (generation >= currentGeneration) { - currentGeneration = generation; - startTime = TInstant::FromValue(rowset.GetValue()); - ui32 statusValue = rowset.GetValue(); - if (statusValue >= static_cast(TDataErasureScheduler::EStatus::UNSPECIFIED) && - statusValue <= static_cast(TDataErasureScheduler::EStatus::IN_PROGRESS_BSC)) { - status = static_cast(statusValue); - } - } - - if (!rowset.Next()) { - return false; - } - } - Self->DataErasureScheduler->Restore({.IsInitialized = true, - .Generation = currentGeneration, - .Status = status, - .StartTime = startTime}, ctx); - Self->DataErasureGeneration = currentGeneration; - } - } - - { - auto rowset = db.Table().Range().Select(); - if (!rowset.IsReady()) - return false; - while (!rowset.EndOfSet()) { - TOwnerId ownerPathId = rowset.GetValue(); - TLocalPathId localPathId = rowset.GetValue(); - TPathId pathId(ownerPathId, localPathId); - Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); - TPathElement::TPtr path = Self->PathsById.at(pathId); - Y_VERIFY_S(path->IsDomainRoot(), "Path is not a subdomain, pathId: " << pathId); - - Y_ABORT_UNLESS(Self->SubDomains.contains(pathId)); - - ui32 statusValue = rowset.GetValue(); - EDataErasureStatus status = TSchemeShard::EDataErasureStatus::COMPLETED; - if (statusValue >= static_cast(TSchemeShard::EDataErasureStatus::UNSPECIFIED) && - statusValue <= static_cast(TSchemeShard::EDataErasureStatus::IN_PROGRESS)) { - status = static_cast(statusValue); - } - - Self->ActiveDataErasureTenants[pathId] = status; - - if (!rowset.Next()) { - return false; - } - } - } - } else { - { - auto rowset = db.Table().Range().Select(); - if (!rowset.IsReady()) { - return false; - } - while (!rowset.EndOfSet()) { - ui64 generation = rowset.GetValue(); - if (generation > Self->DataErasureGeneration) { - Self->DataErasureGeneration = generation; - ui32 statusValue = rowset.GetValue(); - EDataErasureStatus status = TSchemeShard::EDataErasureStatus::COMPLETED; - if (statusValue >= static_cast(TSchemeShard::EDataErasureStatus::UNSPECIFIED) && - statusValue <= static_cast(TSchemeShard::EDataErasureStatus::IN_PROGRESS)) { - status = static_cast(statusValue); - } - Self->DataErasureStatus = status; - } - - if (!rowset.Next()) { - return false; - } - } - } - - { - auto rowset = db.Table().Range().Select(); - if (!rowset.IsReady()) { - return false; - } - while (!rowset.EndOfSet()) { - TOwnerId ownerId = rowset.GetValue(); - TLocalShardIdx localShardId = rowset.GetValue(); - TShardIdx shardId(ownerId, localShardId); - - ui32 statusValue = rowset.GetValue(); - EDataErasureStatus status = TSchemeShard::EDataErasureStatus::COMPLETED; - if (statusValue >= static_cast(TSchemeShard::EDataErasureStatus::UNSPECIFIED) && - statusValue <= static_cast(TSchemeShard::EDataErasureStatus::IN_PROGRESS)) { - status = static_cast(statusValue); - } - Self->ActiveDataErasureShards[shardId] = status; - - if (!rowset.Next()) { - return false; - } - } - } - } + // if (!Self->DataErasureManager->Restore(db)) { + // return false; + // } } // Read External Tables diff --git a/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp new file mode 100644 index 000000000000..9c0ca52c58ba --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__root_data_erasure_manager.cpp @@ -0,0 +1,668 @@ +#include "schemeshard__data_erasure_manager.h" + +#include + +namespace NKikimr::NSchemeShard { + +TRootDataErasureManager::TStarter::TStarter(TRootDataErasureManager* const manager) + : Manager(manager) +{} + +NOperationQueue::EStartStatus TRootDataErasureManager::TStarter::StartOperation(const TPathId& pathId) { + return Manager->StartDataErasure(pathId); +} + +void TRootDataErasureManager::TStarter::OnTimeout(const TPathId& pathId) { + Manager->OnTimeout(pathId); +} + +TRootDataErasureManager::TRootDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) + : TDataErasureManager(schemeShard) + , Starter(this) + , Queue(new TQueue(ConvertConfig(config), Starter)) + , DataErasureInterval(TDuration::Seconds(config.GetDataErasureIntervalSeconds())) + , DataErasureBSCInterval(TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds())) + , CurrentWakeupInterval(DataErasureInterval) + , BSC(MakeBSControllerID()) + , IsManualStartup(config.GetForceManualStartup()) +{ + const auto ctx = SchemeShard->ActorContext(); + ctx.RegisterWithSameMailbox(Queue); + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Created: Timeout# " << config.GetTimeoutSeconds() + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << config.GetInflightLimit() + << ", DataErasureInterval# " << DataErasureInterval + << ", DataErasureBSCInterval# " << DataErasureBSCInterval + << ", CurrentWakeupInterval# " << CurrentWakeupInterval + << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); +} + +void TRootDataErasureManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { + TRootDataErasureManager::TQueue::TConfig queueConfig = ConvertConfig(config); + Queue->UpdateConfig(queueConfig); + DataErasureInterval = TDuration::Seconds(config.GetDataErasureIntervalSeconds()); + DataErasureBSCInterval = TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds()); + CurrentWakeupInterval = DataErasureInterval; + BSC = TTabletId(MakeBSControllerID()); + IsManualStartup = config.GetForceManualStartup(); + + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Config updated: Timeout# " << queueConfig.Timeout + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << queueConfig.InflightLimit + << ", DataErasureInterval# " << DataErasureInterval + << ", DataErasureBSCInterval# " << DataErasureBSCInterval + << ", CurrentWakeupInterval# " << CurrentWakeupInterval + << ", IsManualStartup# " << (IsManualStartup ? "true" : "false")); +} + +void TRootDataErasureManager::Start() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Start: Status# " << static_cast(Status)); + + Queue->Start(); + if (Status == EStatus::UNSPECIFIED) { + SchemeShard->MarkFirstRunRootDataErasureManager(); + ScheduleDataErasureWakeup(); + } else if (Status == EStatus::COMPLETED) { + ScheduleDataErasureWakeup(); + } else { + ClearOperationQueue(); + Continue(); + } +} + +void TRootDataErasureManager::Stop() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Stop"); + + Queue->Stop(); +} + +void TRootDataErasureManager::ClearOperationQueue() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Clear operation queue and active pipes"); + + Queue->Clear(); + ActivePipes.clear(); +} + +void TRootDataErasureManager::ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Clear WaitingDataErasureTenants: Size# " << WaitingDataErasureTenants.size()); + + for (const auto& [pathId, status] : WaitingDataErasureTenants) { + db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); + } + ClearWaitingDataErasureRequests(); +} + +void TRootDataErasureManager::ClearWaitingDataErasureRequests() { + WaitingDataErasureTenants.clear(); +} + +void TRootDataErasureManager::Run(NIceDb::TNiceDb& db) { + Status = EStatus::IN_PROGRESS; + StartTime =AppData(SchemeShard->ActorContext())->TimeProvider->Now(); + for (auto& [pathId, subdomain] : SchemeShard->SubDomains) { + auto path = TPath::Init(pathId, SchemeShard); + if (path->IsRoot()) { + continue; + } + if (subdomain->GetTenantSchemeShardID() == InvalidTabletId) { // no tenant schemeshard + continue; + } + Enqueue(pathId); + WaitingDataErasureTenants[pathId] = EStatus::IN_PROGRESS; + db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update(static_cast(WaitingDataErasureTenants[pathId])); + } + if (WaitingDataErasureTenants.empty()) { + Status = EStatus::IN_PROGRESS_BSC; + } + db.Table().Key(Generation).Update(static_cast(Status), StartTime.MicroSeconds()); + + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Run: Queue.Size# " << Queue->Size() + << ", WaitingDataErasureTenants.size# " << WaitingDataErasureTenants.size() + << ", Status# " << static_cast(Status)); +} + +void TRootDataErasureManager::Continue() { + if (Status == EStatus::IN_PROGRESS) { + for (const auto& [pathId, status] : WaitingDataErasureTenants) { + if (status == EStatus::IN_PROGRESS) { + Enqueue(pathId); + } + } + } else if (Status == EStatus::IN_PROGRESS_BSC) { + SendRequestToBSC(); + } + + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Continue: Queue.Size# " << Queue->Size() + << ", Status# " << static_cast(Status)); +} + +void TRootDataErasureManager::ScheduleDataErasureWakeup() { + if (IsManualStartup || IsDataErasureWakeupScheduled) { + return; + } + + const auto ctx = SchemeShard->ActorContext(); + ctx.Schedule(CurrentWakeupInterval, new TEvSchemeShard::TEvWakeupToRunDataErasure); + IsDataErasureWakeupScheduled = true; + + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] ScheduleDataErasureWakeup: Interval# " << CurrentWakeupInterval); +} + +void TRootDataErasureManager::WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) { + Y_UNUSED(ev); + IsDataErasureWakeupScheduled = false; + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] WakeupToRunDataErasure: Timestamp# " << AppData(ctx)->TimeProvider->Now()); + SchemeShard->RunDataErasure(true); +} + +NOperationQueue::EStartStatus TRootDataErasureManager::StartDataErasure(const TPathId& pathId) { + UpdateMetrics(); + + auto ctx = SchemeShard->ActorContext(); + auto it = SchemeShard->SubDomains.find(pathId); + if (it == SchemeShard->SubDomains.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Start] Failed to resolve subdomain info " + "for pathId# " << pathId + << " at schemeshard# " << SchemeShard->TabletID()); + + return NOperationQueue::EStartStatus::EOperationRemove; + } + + const auto& tenantSchemeShardId = it->second->GetTenantSchemeShardID(); + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Start] Data erasure " + "for pathId# " << pathId + << ", tenant schemeshard# " << tenantSchemeShardId + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + + std::unique_ptr request( + new TEvSchemeShard::TEvTenantDataErasureRequest(Generation)); + + ActivePipes[pathId] = SchemeShard->PipeClientCache->Send( + ctx, + ui64(tenantSchemeShardId), + request.release()); + + return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TRootDataErasureManager::OnTimeout(const TPathId& pathId) { + UpdateMetrics(); + SchemeShard->TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_TIMEOUT].Increment(1); + + ActivePipes.erase(pathId); + + auto ctx = SchemeShard->ActorContext(); + if (!SchemeShard->SubDomains.contains(pathId)) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Timeout] Failed to resolve subdomain info " + "for path# " << pathId + << " at schemeshard# " << SchemeShard->TabletID()); + return; + } + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Timeout] Data erasure timeouted " + "for pathId# " << pathId + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + + // retry + Enqueue(pathId); +} + +void TRootDataErasureManager::Enqueue(const TPathId& pathId) { + auto ctx = SchemeShard->ActorContext(); + + if (Queue->Enqueue(pathId)) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] [Enqueue] Enqueued pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); + UpdateMetrics(); + } else { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] [Enqueue] Skipped or already exists pathId# " << pathId << " at schemeshard " << SchemeShard->TabletID()); + } +} + +void TRootDataErasureManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Disconnect] Data erasure disconnect " + "to tablet: " << tabletId + << ", at schemeshard: " << SchemeShard->TabletID()); + + if (tabletId == BSC) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] HandleDisconnect resend request to BSC at schemeshard " << SchemeShard->TabletID()); + + SendRequestToBSC(); + return; + } + + const auto shardIdx = SchemeShard->GetShardIdx(tabletId); + if (!SchemeShard->ShardInfos.contains(shardIdx)) { + return; + } + + const auto& pathId = SchemeShard->ShardInfos.at(shardIdx).PathId; + if (!SchemeShard->TTLEnabledTables.contains(pathId)) { + return; + } + + const auto it = ActivePipes.find(pathId); + if (it == ActivePipes.end()) { + return; + } + + if (it->second != clientId) { + return; + } + + ActivePipes.erase(pathId); + StartDataErasure(pathId); +} + +void TRootDataErasureManager::OnDone(const TPathId& pathId, NIceDb::TNiceDb& db) { + auto duration = Queue->OnDone(pathId); + + auto ctx = SchemeShard->ActorContext(); + if (!SchemeShard->SubDomains.contains(pathId)) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Finished] Failed to resolve subdomain info " + "for pathId# " << pathId + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + } else { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [Finished] Data erasure completed " + "for pathId# " << pathId + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " tenants" + << ", running# " << Queue->RunningSize() << " tenants" + << " at schemeshard " << SchemeShard->TabletID()); + } + + ActivePipes.erase(pathId); + auto it = WaitingDataErasureTenants.find(pathId); + if (it != WaitingDataErasureTenants.end()) { + it->second = EStatus::COMPLETED; + db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update(static_cast(it->second)); + } + + SchemeShard->TabletCounters->Cumulative()[COUNTER_DATA_ERASURE_OK].Increment(1); + UpdateMetrics(); + + bool isDataErasureCompleted = true; + for (const auto& [pathId, status] : WaitingDataErasureTenants) { + if (status == EStatus::IN_PROGRESS) { + isDataErasureCompleted = false; + break; + } + } + + if (isDataErasureCompleted) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Data erasure in tenants is completed. Send request to BS controller"); + Status = EStatus::IN_PROGRESS_BSC; + db.Table().Key(Generation).Update(static_cast(Status)); + } +} + +void TRootDataErasureManager::OnDone(const TTabletId&, NIceDb::TNiceDb&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RootDataErasureManager] [OnDone] Cannot execute in root schemeshard: " << SchemeShard->TabletID()); +} + +void TRootDataErasureManager::ScheduleRequestToBSC() { + if (IsRequestToBSCScheduled) { + return; + } + + auto ctx = SchemeShard->ActorContext(); + ctx.Schedule(DataErasureBSCInterval, new TEvSchemeShard::TEvWakeupToRunDataErasureBSC); + IsRequestToBSCScheduled = true; + + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] ScheduleRequestToBSC: Interval# " << DataErasureBSCInterval); +} + +void TRootDataErasureManager::SendRequestToBSC() { + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] SendRequestToBSC: Generation# " << Generation); + + IsRequestToBSCScheduled = false; + std::unique_ptr request( + new TEvBlobStorage::TEvControllerShredRequest(Generation)); + SchemeShard->PipeClientCache->Send(ctx, MakeBSControllerID(), request.release()); +} + +void TRootDataErasureManager::Complete() { + Status = EStatus::COMPLETED; + auto ctx = SchemeShard->ActorContext(); + FinishTime = AppData(ctx)->TimeProvider->Now(); + TDuration dataErasureDuration = FinishTime - StartTime; + if (dataErasureDuration > DataErasureInterval) { + if (!IsManualStartup) { + SchemeShard->RunDataErasure(true); + } + } else { + CurrentWakeupInterval = DataErasureInterval - dataErasureDuration; + ScheduleDataErasureWakeup(); + } + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Complete: Generation# " << Generation + << ", duration# " << dataErasureDuration.Seconds() << " s"); +} + +bool TRootDataErasureManager::Restore(NIceDb::TNiceDb& db) { + { + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + if (rowset.EndOfSet()) { + Status = EStatus::UNSPECIFIED; + } else { + Generation = 0; + Status = EStatus::UNSPECIFIED; + while (!rowset.EndOfSet()) { + ui64 generation = rowset.GetValue(); + if (generation >= Generation) { + Generation = generation; + StartTime = TInstant::FromValue(rowset.GetValue()); + ui32 statusValue = rowset.GetValue(); + if (statusValue >= static_cast(EStatus::UNSPECIFIED) && + statusValue <= static_cast(EStatus::IN_PROGRESS_BSC)) { + Status = static_cast(statusValue); + } + } + + if (!rowset.Next()) { + return false; + } + } + if (Status == EStatus::UNSPECIFIED || Status == EStatus::COMPLETED) { + auto ctx = SchemeShard->ActorContext(); + TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime; + if (interval > DataErasureInterval) { + CurrentWakeupInterval = TDuration::Zero(); + } else { + CurrentWakeupInterval = DataErasureInterval - interval; + } + } + } + } + + ui32 numberDataErasureTenantsInRunning = 0; + { + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + TOwnerId ownerPathId = rowset.GetValue(); + TLocalPathId localPathId = rowset.GetValue(); + TPathId pathId(ownerPathId, localPathId); + Y_VERIFY_S(SchemeShard->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); + TPathElement::TPtr path = SchemeShard->PathsById.at(pathId); + Y_VERIFY_S(path->IsDomainRoot(), "Path is not a subdomain, pathId: " << pathId); + + Y_ABORT_UNLESS(SchemeShard->SubDomains.contains(pathId)); + + ui32 statusValue = rowset.GetValue(); + EStatus status = EStatus::COMPLETED; + if (statusValue >= static_cast(EStatus::UNSPECIFIED) && + statusValue <= static_cast(EStatus::IN_PROGRESS_BSC)) { + status = static_cast(statusValue); + } + + WaitingDataErasureTenants[pathId] = status; + if (status == EStatus::IN_PROGRESS) { + numberDataErasureTenantsInRunning++; + } + + if (!rowset.Next()) { + return false; + } + } + if (Status == EStatus::IN_PROGRESS && (WaitingDataErasureTenants.empty() || numberDataErasureTenantsInRunning == 0)) { + Status = EStatus::IN_PROGRESS_BSC; + } + } + + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[RootDataErasureManager] Restore: Generation# " << Generation + << ", Status# " << static_cast(Status) + << ", WakeupInterval# " << CurrentWakeupInterval.Seconds() << " s" + << ", NumberDataErasureTenantsInRunning# " << numberDataErasureTenantsInRunning); + + return true; +} + +void TRootDataErasureManager::Remove(const TPathId& pathId) { + WaitingDataErasureTenants[pathId] = EStatus::COMPLETED; + bool isDataErasureCompleted = true; + for (const auto& [pathId, status] : WaitingDataErasureTenants) { + if (status == EStatus::IN_PROGRESS) { + isDataErasureCompleted = false; + break; + } + } + + if (isDataErasureCompleted) { + SendRequestToBSC(); + } +} + +void TRootDataErasureManager::UpdateMetrics() { + SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size()); + SchemeShard->TabletCounters->Simple()[COUNTER_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize()); +} + +TRootDataErasureManager::TQueue::TConfig TRootDataErasureManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { + TQueue::TConfig queueConfig; + queueConfig.IsCircular = false; + queueConfig.MaxRate = config.GetMaxRate(); + queueConfig.InflightLimit = config.GetInflightLimit(); + queueConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); + + return queueConfig; +} + +struct TSchemeShard::TTxDataErasureManagerInit : public TSchemeShard::TRwTxBase { + TTxDataErasureManagerInit(TSelf* self) + : TRwTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_DATA_ERASURE_INIT; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxDataErasureManagerInit Execute at schemeshard: " << Self->TabletID()); + NIceDb::TNiceDb db(txc.DB); + Self->DataErasureManager->SetStatus(TDataErasureManager::EStatus::COMPLETED); + db.Table().Key(0).Update(static_cast(Self->DataErasureManager->GetStatus()), AppData(ctx)->TimeProvider->Now().MicroSeconds()); + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxDataErasureManagerInit Complete at schemeshard: " << Self->TabletID()); + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxDataErasureManagerInit() { + return new TTxDataErasureManagerInit(this); +} + +struct TSchemeShard::TTxRunDataErasure : public TSchemeShard::TRwTxBase { + bool IsNewDataErasure; + bool NeedSendRequestToBSC = false; + + TTxRunDataErasure(TSelf *self, bool isNewDataErasure) + : TRwTxBase(self) + , IsNewDataErasure(isNewDataErasure) + {} + + TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunDataErasure Execute at schemeshard: " << Self->TabletID()); + + NIceDb::TNiceDb db(txc.DB); + auto& dataErasureManager = Self->DataErasureManager; + if (IsNewDataErasure) { + dataErasureManager->ClearOperationQueue(); + + dataErasureManager->ClearWaitingDataErasureRequests(db); + dataErasureManager->IncGeneration(); + dataErasureManager->Run(db); + } + if (Self->DataErasureManager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) { + NeedSendRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunDataErasure Complete at schemeshard: " << Self->TabletID() + << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); + + if (NeedSendRequestToBSC) { + Self->DataErasureManager->SendRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunDataErasure(bool isNewDataErasure) { + return new TTxRunDataErasure(this, isNewDataErasure); +} + +struct TSchemeShard::TTxCompleteDataErasureTenant : public TSchemeShard::TRwTxBase { + const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr Ev; + bool NeedSendRequestToBSC = false; + + TTxCompleteDataErasureTenant(TSelf* self, const TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE_TENANT; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureTenant Execute at schemeshard: " << Self->TabletID()); + + const auto& record = Ev->Get()->Record; + auto& manager = Self->DataErasureManager; + const ui64 completedGeneration = record.GetGeneration(); + if (completedGeneration != manager->GetGeneration()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureTenant Unknown generation#" << completedGeneration << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); + return; + } + + NIceDb::TNiceDb db(txc.DB); + auto pathId = TPathId( + record.GetPathId().GetOwnerId(), + record.GetPathId().GetLocalId()); + manager->OnDone(pathId, db); + if (manager->GetStatus() == TDataErasureManager::EStatus::IN_PROGRESS_BSC) { + NeedSendRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureTenant Complete at schemeshard: " << Self->TabletID() + << ", NeedSendRequestToBSC# " << (NeedSendRequestToBSC ? "true" : "false")); + if (NeedSendRequestToBSC) { + Self->DataErasureManager->SendRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureTenant(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev) { + return new TTxCompleteDataErasureTenant(this, ev); +} + +struct TSchemeShard::TTxCompleteDataErasureBSC : public TSchemeShard::TRwTxBase { + const TEvBlobStorage::TEvControllerShredResponse::TPtr Ev; + bool NeedScheduleRequestToBSC = false; + + TTxCompleteDataErasureBSC(TSelf* self, const TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_COMPLETE_DATA_ERASURE_BSC; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureBSC Execute at schemeshard: " << Self->TabletID()); + + const auto& record = Ev->Get()->Record; + auto& manager = Self->DataErasureManager; + if (record.GetCurrentGeneration() != manager->GetGeneration()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureBSC Unknown generation#" << record.GetCurrentGeneration() << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); + return; + } + + NIceDb::TNiceDb db(txc.DB); + if (record.GetCompleted()) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Data shred in BSC is completed"); + manager->Complete(); + db.Table().Key(Self->DataErasureManager->GetGeneration()).Update(static_cast(Self->DataErasureManager->GetStatus())); + } else { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxCompleteDataErasureBSC: Progress data shred in BSC " << record.GetProgress10k()); + NeedScheduleRequestToBSC = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureBSC Complete at schemeshard: " << Self->TabletID() + << ", NeedScheduleRequestToBSC# " << (NeedScheduleRequestToBSC ? "true" : "false")); + + if (NeedScheduleRequestToBSC) { + Self->DataErasureManager->ScheduleRequestToBSC(); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev) { + return new TTxCompleteDataErasureBSC(this, ev); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure.cpp b/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure.cpp deleted file mode 100644 index c31bf8cc82ba..000000000000 --- a/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "schemeshard_impl.h" - -namespace NKikimr::NSchemeShard { - -void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev, const TActorContext& ctx) { - Execute(CreateTxRunTenantDataErasure(ev), ctx); -} - -NOperationQueue::EStartStatus TSchemeShard::StartTenantDataErasure(const TShardIdx& shardIdx) { - UpdateTenantDataErasureQueueMetrics(); - - auto ctx = ActorContext(); - - auto it = ShardInfos.find(shardIdx); - if (it == ShardInfos.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Start] Failed to resolve shard info " - "for data erasure# " << shardIdx - << " at schemeshard# " << TabletID()); - - return NOperationQueue::EStartStatus::EOperationRemove; - } - - const auto& datashardId = it->second.TabletID; - const auto& pathId = it->second.PathId; - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Start] Data erasure " - "for pathId# " << pathId << ", datashard# " << datashardId - << ", next wakeup# " << TenantDataErasureQueue->GetWakeupDelta() - << ", rate# " << TenantDataErasureQueue->GetRate() - << ", in queue# " << TenantDataErasureQueue->Size() << " shards" - << ", running# " << TenantDataErasureQueue->RunningSize() << " shards" - << " at schemeshard " << TabletID()); - - std::unique_ptr request( - new TEvDataShard::TEvForceDataCleanup(DataErasureGeneration)); - - RunningDataErasureShards[shardIdx] = PipeClientCache->Send( - ctx, - ui64(datashardId), - request.release()); - - return NOperationQueue::EStartStatus::EOperationRunning; -} - -void TSchemeShard::OnTenantDataErasureTimeout(const TShardIdx& shardIdx) { - UpdateTenantDataErasureQueueMetrics(); - TabletCounters->Cumulative()[COUNTER_TENANT_DATA_ERASURE_TIMEOUT].Increment(1); - - RunningDataErasureShards.erase(shardIdx); - - auto ctx = ActorContext(); - - auto it = ShardInfos.find(shardIdx); - if (it == ShardInfos.end()) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Timeout] Failed to resolve shard info " - "for timeout data erasure# " << shardIdx - << " at schemeshard# " << TabletID()); - return; - } - - const auto& datashardId = it->second.TabletID; - const auto& pathId = it->second.PathId; - - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Timeout] Data erasure timeout " - "for pathId# " << pathId << ", datashard# " << datashardId - << ", next wakeup# " << TenantDataErasureQueue->GetWakeupDelta() - << ", in queue# " << TenantDataErasureQueue->Size() << " shards" - << ", running# " << TenantDataErasureQueue->RunningSize() << " shards" - << " at schemeshard " << TabletID()); - - // retry - EnqueueTenantDataErasure(shardIdx); -} - -void TSchemeShard::Handle(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - const ui64 completedGeneration = record.GetDataCleanupGeneration(); - if (completedGeneration != DataErasureGeneration) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Handle TEvForceDataCleanupResult: Unknown generation#" << completedGeneration << ", Expected gen# " << DataErasureGeneration << " at schemestard: " << TabletID()); - return; - } - - const TTabletId tabletId(record.GetTabletId()); - const TShardIdx shardIdx = GetShardIdx(tabletId); - const auto it = ShardInfos.find(shardIdx); - - auto duration = TenantDataErasureQueue->OnDone(shardIdx); - - if (shardIdx == InvalidShardIdx) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Finished] Failed to resolve shard info " - "for pathId# " << (it != ShardInfos.end() ? it->second.PathId.ToString() : "") << ", datashard# " << tabletId - << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup in# " << TenantDataErasureQueue->GetWakeupDelta() - << ", rate# " << TenantDataErasureQueue->GetRate() - << ", in queue# " << TenantDataErasureQueue->Size() << " shards" - << ", running# " << TenantDataErasureQueue->RunningSize() << " shards" - << " at schemeshard " << TabletID()); - } else { - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Finished] Data erasure is completed " - "for pathId# " << (it != ShardInfos.end() ? it->second.PathId.ToString() : "") << ", datashard# " << tabletId - << ", shardIdx# " << shardIdx - << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup in# " << TenantDataErasureQueue->GetWakeupDelta() - << ", rate# " << TenantDataErasureQueue->GetRate() - << ", in queue# " << TenantDataErasureQueue->Size() << " shards" - << ", running# " << TenantDataErasureQueue->RunningSize() << " shards" - << " at schemeshard " << TabletID()); - } - - RunningDataErasureShards.erase(shardIdx); - - TabletCounters->Cumulative()[COUNTER_TENANT_DATA_ERASURE_OK].Increment(1); - UpdateTenantDataErasureQueueMetrics(); - - if (RunningDataErasureShards.empty()) { - std::unique_ptr response( - new TEvSchemeShard::TEvTenantDataErasureResponse(ParentDomainId, DataErasureGeneration, TEvSchemeShard::TEvTenantDataErasureResponse::EStatus::COMPLETED)); - - const ui64 rootSchemeshard = ParentDomainId.OwnerId; - - PipeClientCache->Send( - ctx, - ui64(rootSchemeshard), - response.release()); - } -} - -void TSchemeShard::TenantDataErasureHandleDisconnect(TTabletId tabletId, const TActorId& clientId) { - auto tabletIt = TabletIdToShardIdx.find(tabletId); - if (tabletIt == TabletIdToShardIdx.end()) - return; // just sanity check - const auto& shardIdx = tabletIt->second; - - auto it = RunningDataErasureShards.find(shardIdx); - if (it == RunningDataErasureShards.end()) - return; - - if (it->second != clientId) - return; - - RunningDataErasureShards.erase(it); - - StartTenantDataErasure(shardIdx); -} - -void TSchemeShard::EnqueueTenantDataErasure(const TShardIdx& shardIdx) { - if (!TenantDataErasureQueue) - return; - - auto ctx = ActorContext(); - - if (TenantDataErasureQueue->Enqueue(shardIdx)) { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasure] [Enqueue] Enqueued shard# " << shardIdx << " at schemeshard " << TabletID()); - UpdateTenantDataErasureQueueMetrics(); - } else { - LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[TenantDataErasure] [Enqueue] Skipped or already exists shard# " << shardIdx << " at schemeshard " << TabletID()); - } -} -void TSchemeShard::RemoveTenantDataErasure(const TShardIdx& shardIdx) { - if (!TenantDataErasureQueue) - return; - - RunningDataErasureShards.erase(shardIdx); - TenantDataErasureQueue->Remove(shardIdx); - UpdateTenantDataErasureQueueMetrics(); -} - -void TSchemeShard::UpdateTenantDataErasureQueueMetrics() { - if (!TenantDataErasureQueue) { - return; - } - - TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_SIZE].Set(TenantDataErasureQueue->Size()); - TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_RUNNING].Set(TenantDataErasureQueue->RunningSize()); -} - -struct TSchemeShard::TTxRunTenantDataErasure : public TSchemeShard::TRwTxBase { - TEvSchemeShard::TEvTenantDataErasureRequest::TPtr Ev; - - TTxRunTenantDataErasure(TSelf *self, TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) - : TRwTxBase(self) - , Ev(std::move(ev)) - {} - - TTxType GetTxType() const override { return TXTYPE_RUN_TENANT_DATA_ERASURE; } - - void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunTenantDataErasure Execute at schemestard: " << Self->TabletID()); - if (Self->IsDomainSchemeShard) { - LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Request] Cannot run data erasure on root schemeshard"); - return; - } - - NIceDb::TNiceDb db(txc.DB); - const auto& record = Ev->Get()->Record; - if (Self->DataErasureGeneration < record.GetGeneration()) { - Self->DataErasureGeneration = record.GetGeneration(); - Self->TenantDataErasureQueue->Clear(); - Self->RunningDataErasureShards.clear(); - for (const auto& [shardIdx, shardInfo] : Self->ShardInfos) { - if (shardInfo.TabletType == ETabletType::DataShard) { - Self->EnqueueTenantDataErasure(shardIdx); // forward generation - } - } - // write new DataErasureGeneration to local DB - } - } - - void DoComplete(const TActorContext& ctx) override { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxRunTenantDataErasure Execute at schemestard: " << Self->TabletID()); - } -}; - -NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunTenantDataErasure(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) { - return new TTxRunTenantDataErasure(this, ev); -} - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp b/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp new file mode 100644 index 000000000000..600cd35a3f64 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__tenant_data_erasure_manager.cpp @@ -0,0 +1,531 @@ +#include "schemeshard__data_erasure_manager.h" + +#include + +namespace NKikimr::NSchemeShard { + +TTenantDataErasureManager::TStarter::TStarter(TTenantDataErasureManager* const manager) + : Manager(manager) +{} + +NOperationQueue::EStartStatus TTenantDataErasureManager::TStarter::StartOperation(const TShardIdx& shardIdx) { + return Manager->StartDataErasure(shardIdx); +} + +void TTenantDataErasureManager::TStarter::OnTimeout(const TShardIdx& shardIdx) { + Manager->OnTimeout(shardIdx); +} + +TTenantDataErasureManager::TTenantDataErasureManager(TSchemeShard* const schemeShard, const NKikimrConfig::TDataErasureConfig& config) + : TDataErasureManager(schemeShard) + , Starter(this) + , Queue(new TQueue(ConvertConfig(config), Starter)) +{ + const auto ctx = SchemeShard->ActorContext(); + ctx.RegisterWithSameMailbox(Queue); + + const auto& tenantDataErasureConfig = config.GetTenantDataErasureConfig(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Created: Timeout# " << tenantDataErasureConfig.GetTimeoutSeconds() + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << tenantDataErasureConfig.GetInflightLimit()); +} + +void TTenantDataErasureManager::UpdateConfig(const NKikimrConfig::TDataErasureConfig& config) { + TTenantDataErasureManager::TQueue::TConfig queueConfig = ConvertConfig(config); + Queue->UpdateConfig(queueConfig); + + const auto ctx = SchemeShard->ActorContext(); + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Config updated: Timeout# " << queueConfig.Timeout + << ", Rate# " << Queue->GetRate() + << ", InflightLimit# " << queueConfig.InflightLimit); +} + +void TTenantDataErasureManager::Start() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Start: Status# " << static_cast(Status)); + + Queue->Start(); + if (Status == EStatus::COMPLETED) { + SendResponseToRootSchemeShard(); + } else if (Status == EStatus::IN_PROGRESS) { + ClearOperationQueue(); + Continue(); + } +} + +void TTenantDataErasureManager::Stop() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Stop"); + + Queue->Stop(); +} + +void TTenantDataErasureManager::ClearOperationQueue() { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Clear operation queue and active pipes"); + + Queue->Clear(); + ActivePipes.clear(); +} + +void TTenantDataErasureManager::WakeupToRunDataErasure(TEvSchemeShard::TEvWakeupToRunDataErasure::TPtr& ev, const NActors::TActorContext& ctx) { + Y_UNUSED(ev); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [WakeupToRunDataErasure] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); +} + +void TTenantDataErasureManager::ClearWaitingDataErasureRequests(NIceDb::TNiceDb& db) { + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Clear WaitingDataErasureShards: Size# " << WaitingDataErasureShards.size()); + + for (const auto& [shardIdx, status] : WaitingDataErasureShards) { + db.Table().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Delete(); + } + ClearWaitingDataErasureRequests(); +} + +void TTenantDataErasureManager::ClearWaitingDataErasureRequests() { + WaitingDataErasureShards.clear(); +} + +void TTenantDataErasureManager::Run(NIceDb::TNiceDb& db) { + Status = EStatus::IN_PROGRESS; + for (const auto& [shardIdx, shardInfo] : SchemeShard->ShardInfos) { + if (shardInfo.TabletType == ETabletType::DataShard) { + Enqueue(shardIdx); // forward generation + WaitingDataErasureShards[shardIdx] = EStatus::IN_PROGRESS; + db.Table().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(static_cast(WaitingDataErasureShards[shardIdx])); + } + } + if (WaitingDataErasureShards.empty()) { + Status = EStatus::COMPLETED; + } + db.Table().Key(Generation).Update(static_cast(Status)); + + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Run: Queue.Size# " << Queue->Size() + << ", WaitingDataErasureShards.size# " << WaitingDataErasureShards.size() + << ", Status# " << static_cast(Status)); +} + +void TTenantDataErasureManager::Continue() { + for (const auto& [shardIdx, status] : WaitingDataErasureShards) { + if (status == EStatus::IN_PROGRESS) { + Enqueue(shardIdx); // forward generation + } + } + + const auto ctx = SchemeShard->ActorContext(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Continue: Queue.Size# " << Queue->Size() + << ", Status# " << static_cast(Status)); +} + +NOperationQueue::EStartStatus TTenantDataErasureManager::StartDataErasure(const TShardIdx& shardIdx) { + UpdateMetrics(); + + auto ctx = SchemeShard->ActorContext(); + auto it = SchemeShard->ShardInfos.find(shardIdx); + if (it == SchemeShard->ShardInfos.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Start] Failed to resolve shard info " + "for data erasure# " << shardIdx + << " at schemeshard# " << SchemeShard->TabletID()); + + return NOperationQueue::EStartStatus::EOperationRemove; + } + + const auto& datashardId = it->second.TabletID; + const auto& pathId = it->second.PathId; + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Start] Data erasure " + "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " shards" + << ", running# " << Queue->RunningSize() << " shards" + << " at schemeshard " << SchemeShard->TabletID()); + + std::unique_ptr request( + new TEvDataShard::TEvForceDataCleanup(Generation)); + + ActivePipes[shardIdx] = SchemeShard->PipeClientCache->Send( + ctx, + ui64(datashardId), + request.release()); + + return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TTenantDataErasureManager::OnTimeout(const TShardIdx& shardIdx) { + UpdateMetrics(); + SchemeShard->TabletCounters->Cumulative()[COUNTER_TENANT_DATA_ERASURE_TIMEOUT].Increment(1); + + ActivePipes.erase(shardIdx); + + auto ctx = SchemeShard->ActorContext(); + auto it = SchemeShard->ShardInfos.find(shardIdx); + if (it == SchemeShard->ShardInfos.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Timeout] Failed to resolve shard info " + "for timeout data erasure# " << shardIdx + << " at schemeshard# " << SchemeShard->TabletID()); + return; + } + + const auto& datashardId = it->second.TabletID; + const auto& pathId = it->second.PathId; + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Timeout] Data erasure timeout " + "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << Queue->GetWakeupDelta() + << ", in queue# " << Queue->Size() << " shards" + << ", running# " << Queue->RunningSize() << " shards" + << " at schemeshard " << SchemeShard->TabletID()); + + // retry + Enqueue(shardIdx); +} + +void TTenantDataErasureManager::Enqueue(const TShardIdx& shardIdx) { + auto ctx = SchemeShard->ActorContext(); + + if (Queue->Enqueue(shardIdx)) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [Enqueue] Enqueued shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); + UpdateMetrics(); + } else { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [Enqueue] Skipped or already exists shard# " << shardIdx << " at schemeshard " << SchemeShard->TabletID()); + } +} + +void TTenantDataErasureManager::HandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Disconnect] Data erasure disconnect " + "to tablet: " << tabletId + << ", at schemeshard: " << SchemeShard->TabletID()); + + if (tabletId == TTabletId(SchemeShard->ParentDomainId.OwnerId)) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] HandleDisconnect resend response to root schemeshard at schemeshard " << SchemeShard->TabletID()); + + SendResponseToRootSchemeShard(); + return; + } + auto tabletIt = SchemeShard->TabletIdToShardIdx.find(tabletId); + if (tabletIt == SchemeShard->TabletIdToShardIdx.end()) { + return; // just sanity check + } + + const auto& shardIdx = tabletIt->second; + auto it = ActivePipes.find(shardIdx); + if (it == ActivePipes.end()) { + return; + } + + if (it->second != clientId) { + return; + } + + ActivePipes.erase(it); + StartDataErasure(shardIdx); +} + +void TTenantDataErasureManager::OnDone(const TPathId&, NIceDb::TNiceDb&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [OnDone] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); +} + +void TTenantDataErasureManager::OnDone(const TTabletId& tabletId, NIceDb::TNiceDb& db) { + const TShardIdx shardIdx = SchemeShard->GetShardIdx(tabletId); + const auto it = SchemeShard->ShardInfos.find(shardIdx); + + auto duration = Queue->OnDone(shardIdx); + + auto ctx = SchemeShard->ActorContext(); + if (shardIdx == InvalidShardIdx) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Finished] Failed to resolve shard info " + "for pathId# " << (it != SchemeShard->ShardInfos.end() ? it->second.PathId.ToString() : "") << ", datashard# " << tabletId + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " shards" + << ", running# " << Queue->RunningSize() << " shards" + << " at schemeshard " << SchemeShard->TabletID()); + } else { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasureManager] [Finished] Data erasure is completed " + "for pathId# " << (it != SchemeShard->ShardInfos.end() ? it->second.PathId.ToString() : "") << ", datashard# " << tabletId + << ", shardIdx# " << shardIdx + << " in# " << duration.MilliSeconds() << " ms" + << ", next wakeup in# " << Queue->GetWakeupDelta() + << ", rate# " << Queue->GetRate() + << ", in queue# " << Queue->Size() << " shards" + << ", running# " << Queue->RunningSize() << " shards" + << " at schemeshard " << SchemeShard->TabletID()); + } + + ActivePipes.erase(shardIdx); + { + auto it = WaitingDataErasureShards.find(shardIdx); + if (it != WaitingDataErasureShards.end()) { + it->second = EStatus::COMPLETED; + } + db.Table().Key(shardIdx.GetOwnerId(), shardIdx.GetLocalId()).Update(static_cast(it->second)); + } + + SchemeShard->TabletCounters->Cumulative()[COUNTER_TENANT_DATA_ERASURE_OK].Increment(1); + UpdateMetrics(); + + bool isTenantDataErasureCompleted = true; + for (const auto& [shardIdx, status] : WaitingDataErasureShards) { + if (status == EStatus::IN_PROGRESS) { + isTenantDataErasureCompleted = false; + } + } + if (isTenantDataErasureCompleted) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Data erasure in shards is completed. Send response to root schemeshard"); + Complete(); + db.Table().Key(Generation).Update(static_cast(Status)); + } +} + +void TTenantDataErasureManager::ScheduleRequestToBSC() { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [ScheduleRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); +} + +void TTenantDataErasureManager::SendRequestToBSC() { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [SendRequestToBSC] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); +} + +void TTenantDataErasureManager::Complete() { + Status = EStatus::COMPLETED; + + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Complete: Generation# " << Generation); +} + +bool TTenantDataErasureManager::Restore(NIceDb::TNiceDb& db) { + { + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + ui64 generation = rowset.GetValue(); + if (generation >= Generation) { + Generation = generation; + ui32 statusValue = rowset.GetValue(); + Status = EStatus::UNSPECIFIED; + if (statusValue >= static_cast(EStatus::UNSPECIFIED) && + statusValue <= static_cast(EStatus::IN_PROGRESS_BSC)) { + Status = static_cast(statusValue); + } + } + + if (!rowset.Next()) { + return false; + } + } + } + + ui64 numberDataErasureShardsInRunning = 0; + { + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + TOwnerId ownerId = rowset.GetValue(); + TLocalShardIdx localShardId = rowset.GetValue(); + TShardIdx shardId(ownerId, localShardId); + + ui32 statusValue = rowset.GetValue(); + EStatus status = EStatus::COMPLETED; + if (statusValue >= static_cast(EStatus::UNSPECIFIED) && + statusValue <= static_cast(EStatus::IN_PROGRESS_BSC)) { + status = static_cast(statusValue); + } + WaitingDataErasureShards[shardId] = status; + if (status == EStatus::IN_PROGRESS) { + numberDataErasureShardsInRunning++; + } + + if (!rowset.Next()) { + return false; + } + } + if (Status == EStatus::IN_PROGRESS && (WaitingDataErasureShards.empty() || numberDataErasureShardsInRunning == 0)) { + Status = EStatus::COMPLETED; + } + } + + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] Restore: Generation# " << Generation + << ", Status# " << static_cast(Status) + << ", NumberDataErasureShardsInRunning# " << numberDataErasureShardsInRunning); + + return true; +} + +void TTenantDataErasureManager::Remove(const TPathId&) { + auto ctx = SchemeShard->ActorContext(); + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] [Remove] Cannot execute in tenant schemeshard: " << SchemeShard->TabletID()); +} + +void TTenantDataErasureManager::UpdateMetrics() { + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_SIZE].Set(Queue->Size()); + SchemeShard->TabletCounters->Simple()[COUNTER_TENANT_DATA_ERASURE_QUEUE_RUNNING].Set(Queue->RunningSize()); +} + +void TTenantDataErasureManager::SendResponseToRootSchemeShard() { + auto ctx = SchemeShard->ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[TenantDataErasureManager] SendResponseToRootSchemeShard: Generation# " << Generation + << ", Status# " << static_cast(Status) + << ", RootSchemeshard# " << SchemeShard->ParentDomainId.OwnerId); + + std::unique_ptr response( + new TEvSchemeShard::TEvTenantDataErasureResponse(SchemeShard->ParentDomainId, Generation, TEvSchemeShard::TEvTenantDataErasureResponse::EStatus::COMPLETED)); + + const ui64 rootSchemeshard = SchemeShard->ParentDomainId.OwnerId; + SchemeShard->PipeClientCache->Send( + ctx, + ui64(rootSchemeshard), + response.release()); +} + + +TTenantDataErasureManager::TQueue::TConfig TTenantDataErasureManager::ConvertConfig(const NKikimrConfig::TDataErasureConfig& config) { + TQueue::TConfig queueConfig; + const auto& tenantDataErasureConfig = config.GetTenantDataErasureConfig(); + queueConfig.IsCircular = false; + queueConfig.MaxRate = tenantDataErasureConfig.GetMaxRate(); + queueConfig.InflightLimit = tenantDataErasureConfig.GetInflightLimit(); + queueConfig.Timeout = TDuration::Seconds(tenantDataErasureConfig.GetTimeoutSeconds()); + + return queueConfig; +} + +struct TSchemeShard::TTxRunTenantDataErasure : public TSchemeShard::TRwTxBase { + TEvSchemeShard::TEvTenantDataErasureRequest::TPtr Ev; + bool NeedResponseComplete = false; + + TTxRunTenantDataErasure(TSelf *self, TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE_TENANT; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunTenantDataErasure Execute at schemestard: " << Self->TabletID()); + + if (Self->IsDomainSchemeShard) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[TenantDataErasure] [Request] Cannot run data erasure on root schemeshard"); + return; + } + + NIceDb::TNiceDb db(txc.DB); + const auto& record = Ev->Get()->Record; + auto& dataErasureManager = Self->DataErasureManager; + if (dataErasureManager->GetGeneration() < record.GetGeneration()) { + dataErasureManager->SetGeneration(record.GetGeneration()); + dataErasureManager->ClearOperationQueue(); + dataErasureManager->ClearWaitingDataErasureRequests(db); + dataErasureManager->Run(db); + } + if (Self->DataErasureManager->GetGeneration() == record.GetGeneration() && Self->DataErasureManager->GetStatus() == TDataErasureManager::EStatus::COMPLETED) { + NeedResponseComplete = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxRunTenantDataErasure Complete at schemestard: " << Self->TabletID() + << ", NeedResponseComplete# " << (NeedResponseComplete ? "true" : "false")); + + if (NeedResponseComplete) { + std::unique_ptr response( + new TEvSchemeShard::TEvTenantDataErasureResponse(Self->ParentDomainId, Self->DataErasureManager->GetGeneration(), TEvSchemeShard::TEvTenantDataErasureResponse::EStatus::COMPLETED)); + + const ui64 rootSchemeshard = Self->ParentDomainId.OwnerId; + Self->PipeClientCache->Send( + ctx, + ui64(rootSchemeshard), + response.release()); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxRunTenantDataErasure(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev) { + return new TTxRunTenantDataErasure(this, ev); +} + +struct TSchemeShard::TTxCompleteDataErasureShard : public TSchemeShard::TRwTxBase { + TEvDataShard::TEvForceDataCleanupResult::TPtr Ev; + bool NeedResponseComplete = false; + + TTxCompleteDataErasureShard(TSelf *self, TEvDataShard::TEvForceDataCleanupResult::TPtr& ev) + : TRwTxBase(self) + , Ev(std::move(ev)) + {} + + TTxType GetTxType() const override { return TXTYPE_RUN_DATA_ERASURE_TENANT ; } + + void DoExecute(TTransactionContext& txc, const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureShard Execute at schemestard: " << Self->TabletID()); + const auto& record = Ev->Get()->Record; + + auto& manager = Self->DataErasureManager; + const ui64 cleanupGeneration = record.GetDataCleanupGeneration(); + if (cleanupGeneration != manager->GetGeneration()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureShard: Unknown generation#" << cleanupGeneration + << ", Expected gen# " << manager->GetGeneration() << " at schemestard: " << Self->TabletID()); + return; + } + NIceDb::TNiceDb db(txc.DB); + manager->OnDone(TTabletId(record.GetTabletId()), db); + if (Self->DataErasureManager->GetStatus() == TDataErasureManager::EStatus::COMPLETED) { + NeedResponseComplete = true; + } + } + + void DoComplete(const TActorContext& ctx) override { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TTxCompleteDataErasureShard Complete at schemestard: " << Self->TabletID() + << ", NeedResponseComplete# " << (NeedResponseComplete ? "true" : "false")); + + if (NeedResponseComplete) { + std::unique_ptr response( + new TEvSchemeShard::TEvTenantDataErasureResponse(Self->ParentDomainId, Self->DataErasureManager->GetGeneration(), TEvSchemeShard::TEvTenantDataErasureResponse::EStatus::COMPLETED)); + + const ui64 rootSchemeshard = Self->ParentDomainId.OwnerId; + Self->PipeClientCache->Send( + ctx, + ui64(rootSchemeshard), + response.release()); + } + } +}; + +NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxCompleteDataErasureShard(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev) { + return new TTxCompleteDataErasureShard(this, ev); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.cpp b/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.cpp deleted file mode 100644 index eb92314827e2..000000000000 --- a/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.cpp +++ /dev/null @@ -1,94 +0,0 @@ -#include "schemeshard_data_erasure_scheduler.h" -#include "schemeshard.h" - -namespace NKikimr::NSchemeShard { - -TDataErasureScheduler::TDataErasureScheduler(const NActors::TActorId& schemeShardId, const TDuration& dataErasureInterval, const TDuration& dataErasureBSCInterval) - : SchemeShardId(schemeShardId) - , DataErasureInterval(dataErasureInterval) - , DataErasureBSCInterval(dataErasureBSCInterval) - , DataErasureWakeupScheduled(false) - , CurrentWakeupInterval(DataErasureInterval) -{} - -void TDataErasureScheduler::CompleteDataErasure(const NActors::TActorContext& ctx) { - Status = EStatus::COMPLETED; - FinishTime = AppData(ctx)->TimeProvider->Now(); - TDuration dataErasureDuration = FinishTime - StartTime; - if (dataErasureDuration > DataErasureInterval) { - StartDataErasure(ctx); - } else { - CurrentWakeupInterval = DataErasureInterval - dataErasureDuration; - ScheduleDataErasureWakeup(ctx); - } -} - -void TDataErasureScheduler::Handle(TEvSchemeShard::TEvWakeupToRunDataErasurePtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ev); - DataErasureWakeupScheduled = false; - StartDataErasure(ctx); -} - -void TDataErasureScheduler::StartDataErasure(const NActors::TActorContext& ctx) { - if (Status == EStatus::IN_PROGRESS_TENANT || Status == EStatus::IN_PROGRESS_BSC) { - return; - } - Generation++; - Status = EStatus::IN_PROGRESS_TENANT; - StartTime = AppData(ctx)->TimeProvider->Now(); - ctx.Send(SchemeShardId, new TEvSchemeShard::TEvRunDataErasure(Generation, StartTime)); -} - -void TDataErasureScheduler::ContinueDataErasure(const NActors::TActorContext& ctx) { - if (Status == EStatus::IN_PROGRESS_TENANT) { - ctx.Send(SchemeShardId, new TEvSchemeShard::TEvRunDataErasure(Generation, StartTime)); - } else if (Status == EStatus::IN_PROGRESS_BSC) { - // do request to BSC - } -} - -void TDataErasureScheduler::ScheduleDataErasureWakeup(const NActors::TActorContext& ctx) { - if (DataErasureWakeupScheduled) { - return; - } - - ctx.Schedule(CurrentWakeupInterval, new TEvSchemeShard::TEvWakeupToRunDataErasure); - DataErasureWakeupScheduled = true; -} - -TDataErasureScheduler::EStatus TDataErasureScheduler::GetStatus() const { - return Status; -} - -ui64 TDataErasureScheduler::GetGeneration() const { - return Generation; -} - -bool TDataErasureScheduler::NeedInitialize() const { - return !IsInitialized; -} - -void TDataErasureScheduler::SetStatus(const EStatus& status) { - Status = status; -} - -TDuration TDataErasureScheduler::GetDataErasureBSCInterval() const { - return DataErasureBSCInterval; -} - -void TDataErasureScheduler::Restore(const TRestoreValues& restoreValues, const NActors::TActorContext& ctx) { - IsInitialized = restoreValues.IsInitialized; - Generation = restoreValues.Generation; - Status = restoreValues.Status; - StartTime = restoreValues.StartTime; - if (Status == EStatus::UNSPECIFIED || Status == EStatus::COMPLETED) { - TDuration interval = AppData(ctx)->TimeProvider->Now() - StartTime; - if (interval > DataErasureInterval) { - CurrentWakeupInterval = TDuration::Zero(); - } else { - CurrentWakeupInterval = DataErasureInterval - interval; - } - } -} - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.h b/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.h deleted file mode 100644 index 02435a1072b4..000000000000 --- a/ydb/core/tx/schemeshard/schemeshard_data_erasure_scheduler.h +++ /dev/null @@ -1,67 +0,0 @@ -#pragma once - -#include - -#include - -namespace NActors { - struct TActorContext; -} - -namespace NKikimr::NSchemeShard { - -namespace TEvSchemeShard { - struct TEvWakeupToRunDataErasure; - using TEvWakeupToRunDataErasurePtr = TAutoPtr>; -} - -class TSchemeShard; - -class TDataErasureScheduler { -public: - enum class EStatus : ui32 { - UNSPECIFIED, - COMPLETED, - IN_PROGRESS_TENANT, - IN_PROGRESS_BSC, - }; - - struct TRestoreValues { - bool IsInitialized = false; - ui64 Generation = 0; - EStatus Status = EStatus::UNSPECIFIED; - TInstant StartTime; - }; - -public: - TDataErasureScheduler(const NActors::TActorId& schemeShardId, const TDuration& dataErasureInterval, const TDuration& dataErasureBSCInterval); - - void Handle(TEvSchemeShard::TEvWakeupToRunDataErasurePtr& ev, const NActors::TActorContext& ctx); - void ScheduleDataErasureWakeup(const NActors::TActorContext& ctx); - void StartDataErasure(const NActors::TActorContext& ctx); - void ContinueDataErasure(const NActors::TActorContext& ctx); - void CompleteDataErasure(const NActors::TActorContext& ctx); - - void SetStatus(const EStatus& status); - EStatus GetStatus() const; - ui64 GetGeneration() const; - TDuration GetDataErasureBSCInterval() const; - bool NeedInitialize() const; - - void Restore(const TRestoreValues& restoreValues, const NActors::TActorContext& ctx); - -private: - const NActors::TActorId SchemeShardId; - const TDuration DataErasureInterval; - const TDuration DataErasureBSCInterval; - - EStatus Status = EStatus::UNSPECIFIED; - TInstant StartTime; - TInstant FinishTime; - bool DataErasureWakeupScheduled; - ui64 Generation = 0; - TDuration CurrentWakeupInterval; - bool IsInitialized = false; -}; - -} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 6a6023740eb8..675977f0abc1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3,7 +3,7 @@ #include "schemeshard_svp_migration.h" #include "olap/bg_tasks/adapter/adapter.h" #include "olap/bg_tasks/events/global.h" -#include "schemeshard_data_erasure_scheduler.h" +#include "schemeshard__data_erasure_manager.h" #include #include @@ -128,7 +128,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva StartStopCompactionQueues(); BackgroundCleaningQueue->Start(); - StartDataErasure(ctx); + StartStopDataErasure(); ctx.Send(TxAllocatorClient, MakeHolder(InitiateCachedTxIdsCount)); @@ -447,8 +447,9 @@ void TSchemeShard::Clear() { UpdateBorrowedCompactionQueueMetrics(); } - ActiveDataErasureTenants.clear(); - ActiveDataErasureShards.clear(); + if (DataErasureManager) { + DataErasureManager->Clear(); + } ClearTempDirsState(); @@ -2251,10 +2252,8 @@ void TSchemeShard::PersistRemoveSubDomain(NIceDb::TNiceDb& db, const TPathId& pa db.Table().Key(pathId.LocalPathId, pool.GetName(), pool.GetKind()).Delete(); } - if (ActiveDataErasureTenants.contains(pathId)) { - db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); - ActiveDataErasureTenants.erase(pathId); - } + db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update(static_cast(TDataErasureManager::EStatus::COMPLETED)); + DataErasureManager->Remove(pathId); db.Table().Key(pathId.LocalPathId).Delete(); SubDomains.erase(it); @@ -4488,8 +4487,6 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) , BackgroundCompactionStarter(this) , BorrowedCompactionStarter(this) , BackgroundCleaningStarter(this) - , DataErasureStarter(this) - , TenantDataErasureStarter(this) , ShardDeleter(info->TabletID) , TableStatsQueue(this, COUNTER_STATS_QUEUE_SIZE, @@ -4649,7 +4646,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { MaxCdcInitialScanShardsInFlight = appData->SchemeShardConfig.GetMaxCdcInitialScanShardsInFlight(); ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx); - ConfigureDataErasure(appData->DataErasureConfig, ctx); + ConfigureDataErasureManager(appData->DataErasureConfig); if (appData->ChannelProfiles) { ChannelProfiles = appData->ChannelProfiles; @@ -4772,15 +4769,6 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvSchemeShard::TEvMeasureSelfResponseTime, SelfPinger->Handle); HFuncTraced(TEvSchemeShard::TEvWakeupToMeasureSelfResponseTime, SelfPinger->Handle); - HFuncTraced(TEvSchemeShard::TEvWakeupToRunDataErasure, DataErasureScheduler->Handle); - - HFuncTraced(TEvSchemeShard::TEvRunDataErasure, Handle); - HFuncTraced(TEvSchemeShard::TEvTenantDataErasureRequest, Handle); - HFuncTraced(TEvDataShard::TEvForceDataCleanupResult, Handle); - HFuncTraced(TEvSchemeShard::TEvTenantDataErasureResponse, Handle); - HFuncTraced(TEvSchemeShard::TEvDataErasureInfoRequest, Handle); - HFuncTraced(TEvBlobStorage::TEvControllerShredResponse, Handle); - //operation initiate msg HFuncTraced(TEvSchemeShard::TEvModifySchemeTransaction, Handle); HFuncTraced(TEvSchemeShard::TEvDescribeScheme, Handle); @@ -4958,6 +4946,16 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvInterconnect::TEvNodeDisconnected, Handle); HFuncTraced(TEvPrivate::TEvRetryNodeSubscribe, Handle); + //data erasure + HFuncTraced(TEvSchemeShard::TEvWakeupToRunDataErasure, DataErasureManager->WakeupToRunDataErasure); + HFuncTraced(TEvSchemeShard::TEvTenantDataErasureRequest, Handle); + HFuncTraced(TEvDataShard::TEvForceDataCleanupResult, Handle); + HFuncTraced(TEvSchemeShard::TEvTenantDataErasureResponse, Handle); + HFuncTraced(TEvSchemeShard::TEvDataErasureInfoRequest, Handle); + HFuncTraced(TEvSchemeShard::TEvDataErasureManualStartupRequest, Handle); + HFuncTraced(TEvBlobStorage::TEvControllerShredResponse, Handle); + HFuncTraced(TEvSchemeShard::TEvWakeupToRunDataErasureBSC, Handle); + default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -5619,9 +5617,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc BorrowedCompactionHandleDisconnect(tabletId, clientId); ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); - if (IsDomainSchemeShard) { - DataErasureHandleDisconnect(tabletId, clientId, ctx); - } + DataErasureManager->HandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } @@ -5672,9 +5668,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc BorrowedCompactionHandleDisconnect(tabletId, clientId); ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); - if (IsDomainSchemeShard) { - DataErasureHandleDisconnect(tabletId, clientId, ctx); - } + DataErasureManager->HandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } @@ -7205,7 +7199,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi if (appConfig.HasDataErasureConfig()) { const auto& dataErasureConfig = appConfig.GetDataErasureConfig(); - ConfigureDataErasure(dataErasureConfig, ctx); + ConfigureDataErasureManager(dataErasureConfig); } if (appConfig.HasSchemeShardConfig()) { @@ -7244,7 +7238,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi BackgroundCleaningQueue->Start(); } - StartDataErasure(ctx); + StartStopDataErasure(); } } @@ -7434,73 +7428,7 @@ void TSchemeShard::ConfigureBackgroundCleaningQueue( << ", InflightLimit# " << cleaningConfig.InflightLimit); } -void TSchemeShard::ConfigureDataErasure( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx) -{ - if (IsDomainSchemeShard) { - DataErasureScheduler = new TDataErasureScheduler(SelfId(), TDuration::Seconds(config.GetDataErasureIntervalSeconds()), - TDuration::Seconds(config.GetBlobStorageControllerRequestIntervalSeconds())); - ConfigureDataErasureQueue(config, ctx); - } else { - ConfigureTenantDataErasureQueue(config, ctx); - } -} - -void TSchemeShard::ConfigureDataErasureQueue( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx) -{ - TDataErasureQueue::TConfig dataErasureConfig; - - dataErasureConfig.IsCircular = false; - dataErasureConfig.MaxRate = config.GetMaxRate(); - dataErasureConfig.InflightLimit = config.GetInflightLimit(); - dataErasureConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); - - if (DataErasureQueue) { - DataErasureQueue->UpdateConfig(dataErasureConfig); - } else { - DataErasureQueue = new TDataErasureQueue( - dataErasureConfig, - DataErasureStarter); - ctx.RegisterWithSameMailbox(DataErasureQueue); - } - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "DataErasureQueue configured: Timeout# " << dataErasureConfig.Timeout - << ", Rate# " << DataErasureQueue->GetRate() - << ", WakeupInterval# " << dataErasureConfig.WakeupInterval - << ", InflightLimit# " << dataErasureConfig.InflightLimit); -} - -void TSchemeShard::ConfigureTenantDataErasureQueue( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx) -{ - TTenantDataErasureQueue::TConfig tenantDataErasureConfig; - - tenantDataErasureConfig.IsCircular = false; - tenantDataErasureConfig.MaxRate = config.GetMaxRate(); - tenantDataErasureConfig.InflightLimit = config.GetInflightLimit(); - tenantDataErasureConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); - - if (TenantDataErasureQueue) { - TenantDataErasureQueue->UpdateConfig(tenantDataErasureConfig); - } else { - TenantDataErasureQueue = new TTenantDataErasureQueue( - tenantDataErasureConfig, - TenantDataErasureStarter); - ctx.RegisterWithSameMailbox(TenantDataErasureQueue); - } - - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TenantDataErasureQueue configured: Timeout# " << tenantDataErasureConfig.Timeout - << ", Rate# " << TenantDataErasureQueue->GetRate() - << ", WakeupInterval# " << tenantDataErasureConfig.WakeupInterval - << ", InflightLimit# " << tenantDataErasureConfig.InflightLimit); -} - +// void TScheme void TSchemeShard::ConfigureLoginProvider( const ::NKikimrProto::TAuthConfig& config, const TActorContext &ctx) @@ -7562,31 +7490,6 @@ void TSchemeShard::StartStopCompactionQueues() { BorrowedCompactionQueue->Start(); } -void TSchemeShard::StartDataErasure(const TActorContext& ctx) { - if (EnableDataErasure) { - if (IsDomainSchemeShard) { - DataErasureQueue->Start(); - if (DataErasureScheduler->NeedInitialize()) { - Execute(CreateTxDataErasureSchedulerInit(), ctx); - } - if (DataErasureScheduler->GetStatus() == TDataErasureScheduler::EStatus::IN_PROGRESS_TENANT || - DataErasureScheduler->GetStatus() == TDataErasureScheduler::EStatus::IN_PROGRESS_BSC) { - DataErasureScheduler->ContinueDataErasure(ctx); - } else { - DataErasureScheduler->ScheduleDataErasureWakeup(ctx); - } - } else { - TenantDataErasureQueue->Start(); - } - } else { - if (IsDomainSchemeShard) { - DataErasureQueue->Stop(); - } else { - TenantDataErasureQueue->Stop(); - } - } -} - void TSchemeShard::Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &, const TActorContext &ctx) { LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Subscription to Console has been set up" @@ -7703,27 +7606,49 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, c TEvSchemeShard::TEvDataErasureInfoResponse::EStatus status = TEvSchemeShard::TEvDataErasureInfoResponse::EStatus::UNSPECIFIED; - switch (DataErasureScheduler->GetStatus()) { - case TDataErasureScheduler::EStatus::UNSPECIFIED: + switch (DataErasureManager->GetStatus()) { + case TDataErasureManager::EStatus::UNSPECIFIED: status = TEvSchemeShard::TEvDataErasureInfoResponse::EStatus::UNSPECIFIED; break; - case TDataErasureScheduler::EStatus::COMPLETED: + case TDataErasureManager::EStatus::COMPLETED: status = TEvSchemeShard::TEvDataErasureInfoResponse::EStatus::COMPLETED; break; - case TDataErasureScheduler::EStatus::IN_PROGRESS_TENANT: + case TDataErasureManager::EStatus::IN_PROGRESS: status = TEvSchemeShard::TEvDataErasureInfoResponse::EStatus::IN_PROGRESS_TENANT; break; - case TDataErasureScheduler::EStatus::IN_PROGRESS_BSC: + case TDataErasureManager::EStatus::IN_PROGRESS_BSC: status = TEvSchemeShard::TEvDataErasureInfoResponse::EStatus::IN_PROGRESS_BSC; break; } - ctx.Send(ev->Sender, new TEvSchemeShard::TEvDataErasureInfoResponse(DataErasureScheduler->GetGeneration(), status)); + ctx.Send(ev->Sender, new TEvSchemeShard::TEvDataErasureInfoResponse(DataErasureManager->GetGeneration(), status)); +} + +void TSchemeShard::Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr&, const TActorContext&) { + RunDataErasure(true); +} + +void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "Handle TEvTenantDataErasureRequest, at schemeshard: " << TabletID()); + Execute(CreateTxRunTenantDataErasure(ev), ctx); +} + +void TSchemeShard::Handle(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxCompleteDataErasureShard(ev), ctx); +} + +void TSchemeShard::Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxCompleteDataErasureTenant(ev), ctx); } -void TSchemeShard::Handle(TEvSchemeShard::TEvRunDataErasure::TPtr& ev, const TActorContext& ctx) { +void TSchemeShard::Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, - "Handle TEvRunDataErasure, at schemeshard: " << TabletID()); - Execute(CreateTxRunDataErasure(ev->Get()->Generation, ev->Get()->StartTime), ctx); + "Handle TEvControllerShredResponse, at schemeshard: " << TabletID()); + Execute(CreateTxCompleteDataErasureBSC(ev), ctx); +} + +void TSchemeShard::Handle(TEvSchemeShard::TEvWakeupToRunDataErasureBSC::TPtr&, const TActorContext&) { + DataErasureManager->SendRequestToBSC(); } void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext&) { @@ -7917,5 +7842,37 @@ TDuration TSchemeShard::SendBaseStatsToSA() { + RandomNumber(SendStatsIntervalMaxSeconds - SendStatsIntervalMinSeconds)); } +TAutoPtr TSchemeShard::CreateDataErasureManager(const NKikimrConfig::TDataErasureConfig& config) { + if (IsDomainSchemeShard) { + return new TRootDataErasureManager(this, config); + } else { + return new TTenantDataErasureManager(this, config); + } +} + +void TSchemeShard::ConfigureDataErasureManager(const NKikimrConfig::TDataErasureConfig& config) { + if (DataErasureManager) { + DataErasureManager->UpdateConfig(config); + } else { + DataErasureManager = CreateDataErasureManager(config); + } +} + +void TSchemeShard::StartStopDataErasure() { + if (EnableDataErasure) { + DataErasureManager->Start(); + } else { + DataErasureManager->Stop(); + } +} + +void TSchemeShard::MarkFirstRunRootDataErasureManager() { + Execute(CreateTxDataErasureManagerInit(), this->ActorContext()); +} + +void TSchemeShard::RunDataErasure(bool isNewDataErasure) { + Execute(CreateTxRunDataErasure(isNewDataErasure), this->ActorContext()); +} + } // namespace NSchemeShard } // namespace NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 4938fd53b605..c16955c214f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -19,7 +19,6 @@ #include "schemeshard_schema.h" #include "schemeshard__operation.h" #include "schemeshard__stats.h" -#include "schemeshard_data_erasure_scheduler.h" #include "olap/manager/manager.h" @@ -81,6 +80,8 @@ namespace NSchemeShard { extern const ui64 NEW_TABLE_ALTER_VERSION; +class TDataErasureManager; + class TSchemeShard : public TActor , public NTabletFlatExecutor::TTabletExecutedFlat @@ -174,56 +175,6 @@ class TSchemeShard TSchemeShard* Self; }; - using TDataErasureQueue = NOperationQueue::TOperationQueueWithTimer< - TPathId, - TFifoQueue, - TEvPrivate::EvRunDataErasure, - NKikimrServices::FLAT_TX_SCHEMESHARD, - NKikimrServices::TActivity::DATA_ERASURE>; - - class TDataErasureStarter : public TDataErasureQueue::IStarter { - public: - TDataErasureStarter(TSchemeShard* self) - : Self(self) - { } - - NOperationQueue::EStartStatus StartOperation(const TPathId& pathId) override { - return Self->StartDataErasure(pathId); - } - - void OnTimeout(const TPathId& pathId) override { - Self->OnDataErasureTimeout(pathId); - } - - private: - TSchemeShard* Self; - }; - - using TTenantDataErasureQueue = NOperationQueue::TOperationQueueWithTimer< - TShardIdx, - TFifoQueue, - TEvPrivate::EvRunTenantDataErasure, - NKikimrServices::FLAT_TX_SCHEMESHARD, - NKikimrServices::TActivity::TENANT_DATA_ERASURE>; - - class TTenantDataErasureStarter : public TTenantDataErasureQueue::IStarter { - public: - TTenantDataErasureStarter(TSchemeShard* self) - : Self(self) - { } - - NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override { - return Self->StartTenantDataErasure(shardIdx); - } - - void OnTimeout(const TShardIdx& shardIdx) override { - Self->OnTenantDataErasureTimeout(shardIdx); - } - - private: - TSchemeShard* Self; - }; - public: static constexpr ui32 DefaultPQTabletPartitionsCount = 1; static constexpr ui32 MaxPQTabletPartitionsCount = 1000; @@ -358,28 +309,6 @@ class TSchemeShard TBackgroundCleaningStarter BackgroundCleaningStarter; TBackgroundCleaningQueue* BackgroundCleaningQueue = nullptr; - TDataErasureStarter DataErasureStarter; - TDataErasureQueue* DataErasureQueue = nullptr; - - TTenantDataErasureStarter TenantDataErasureStarter; - TTenantDataErasureQueue* TenantDataErasureQueue = nullptr; - - enum class EDataErasureStatus { - UNSPECIFIED, - COMPLETED, - IN_PROGRESS, - }; - - ui64 DataErasureGeneration = 0; - EDataErasureStatus DataErasureStatus = EDataErasureStatus::UNSPECIFIED; - - THashMap ActiveDataErasureTenants; - THashMap RunningDataErasureTenants; - THashMap ActiveDataErasureShards; - THashMap RunningDataErasureShards; - - TAutoPtr DataErasureScheduler; - struct TBackgroundCleaningState { THashSet TxIds; TVector DirsToRemove; @@ -573,18 +502,6 @@ class TSchemeShard const NKikimrConfig::TBackgroundCleaningConfig& config, const TActorContext &ctx); - void ConfigureDataErasure( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx); - - void ConfigureDataErasureQueue( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx); - - void ConfigureTenantDataErasureQueue( - const NKikimrConfig::TDataErasureConfig& config, - const TActorContext& ctx); - void ConfigureLoginProvider( const ::NKikimrProto::TAuthConfig& config, const TActorContext &ctx); @@ -595,8 +512,6 @@ class TSchemeShard void StartStopCompactionQueues(); - void StartDataErasure(const TActorContext& ctx); - void WaitForTableProfiles(ui64 importId, ui32 itemIdx); void LoadTableProfiles(const NKikimrConfig::TTableProfilesConfig* config, const TActorContext& ctx); @@ -1048,20 +963,6 @@ class TSchemeShard void CleanBackgroundCleaningState(const TPathId& pathId); void ClearTempDirsState(); - void EnqueueDataErasure(const TPathId& pathId); - // void RemoveDataErasure(const TShardIdx& shardIdx); - NOperationQueue::EStartStatus StartDataErasure(const TPathId& pathId); - void OnDataErasureTimeout(const TPathId& pathId); - void UpdateDataErasureQueueMetrics(); - void DataErasureHandleDisconnect(TTabletId tabletId, const TActorId& clientId, const TActorContext& ctx); - - void EnqueueTenantDataErasure(const TShardIdx& shardIdx); - void RemoveTenantDataErasure(const TShardIdx& shardIdx); - NOperationQueue::EStartStatus StartTenantDataErasure(const TShardIdx& shardIdx); - void OnTenantDataErasureTimeout(const TShardIdx& shardIdx); - void UpdateTenantDataErasureQueueMetrics(); - void TenantDataErasureHandleDisconnect(TTabletId tabletId, const TActorId& clientId); - struct TTxCleanDroppedSubDomains; NTabletFlatExecutor::ITransaction* CreateTxCleanDroppedSubDomains(); @@ -1137,17 +1038,23 @@ class TSchemeShard template NTabletFlatExecutor::ITransaction* CreateTxOperationReply(TOperationId id, TEvPtr& ev); + struct TTxDataErasureManagerInit; + NTabletFlatExecutor::ITransaction* CreateTxDataErasureManagerInit(); + + struct TTxRunDataErasure; + NTabletFlatExecutor::ITransaction* CreateTxRunDataErasure(bool isNewDataErasure); + struct TTxRunTenantDataErasure; NTabletFlatExecutor::ITransaction* CreateTxRunTenantDataErasure(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev); - struct TTxRunDataErasure; - NTabletFlatExecutor::ITransaction* CreateTxRunDataErasure(ui64 generation, const TInstant& startTime); + struct TTxCompleteDataErasureShard; + NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureShard(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev); - struct TTxCompleteDataErasure; - NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasure(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev); + struct TTxCompleteDataErasureTenant; + NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureTenant(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev); - struct TTxDataErasureSchedulerInit; - NTabletFlatExecutor::ITransaction* CreateTxDataErasureSchedulerInit(); + struct TTxCompleteDataErasureBSC; + NTabletFlatExecutor::ITransaction* CreateTxCompleteDataErasureBSC(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev); void PublishToSchemeBoard(THashMap>&& paths, const TActorContext& ctx); void PublishToSchemeBoard(TTxId txId, TDeque&& paths, const TActorContext& ctx); @@ -1258,9 +1165,9 @@ class TSchemeShard void Handle(TEvSchemeShard::TEvTenantDataErasureRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvForceDataCleanupResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvTenantDataErasureResponse::TPtr& ev, const TActorContext& ctx); - void Handle(TEvSchemeShard::TEvRunDataErasure::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvDataErasureInfoRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvDataErasureManualStartupRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvWakeupToRunDataErasureBSC::TPtr& ev, const TActorContext& ctx); @@ -1590,7 +1497,11 @@ class TSchemeShard void ConnectToSA(); TDuration SendBaseStatsToSA(); - + TAutoPtr CreateDataErasureManager(const NKikimrConfig::TDataErasureConfig& config); + void ConfigureDataErasureManager(const NKikimrConfig::TDataErasureConfig& config); + void StartStopDataErasure(); + void MarkFirstRunRootDataErasureManager(); + void RunDataErasure(bool isNewDataErasure); public: void ChangeStreamShardsCount(i64 delta) override; @@ -1612,6 +1523,7 @@ class TSchemeShard void SetShardsQuota(ui64 value) override; NLogin::TLoginProvider LoginProvider; + TAutoPtr DataErasureManager = nullptr; private: void OnDetach(const TActorContext &ctx) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 8a6b8f622d69..ccd830deebb5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1934,7 +1934,7 @@ struct Schema : NIceDb::Schema { >; }; - struct DataErasureStarts : Table<115> { + struct DataErasureGenerations : Table<115> { struct Generation : Column<1, NScheme::NTypeIds::Uint64> {}; struct Status : Column<2, NScheme::NTypeIds::Uint32> {}; struct StartTime : Column<3, NScheme::NTypeIds::Timestamp> {}; @@ -1947,7 +1947,7 @@ struct Schema : NIceDb::Schema { >; }; - struct ActiveDataErasureTenants : Table<116> { + struct WaitingDataErasureTenants : Table<116> { struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; struct Status : Column<3, NScheme::NTypeIds::Uint32> {}; @@ -1960,7 +1960,7 @@ struct Schema : NIceDb::Schema { >; }; - struct TenantDataErasureStarts : Table<117> { + struct TenantDataErasureGenerations : Table<117> { struct Generation : Column<1, NScheme::NTypeIds::Uint64> {}; struct Status : Column<2, NScheme::NTypeIds::Uint32> {}; @@ -1971,7 +1971,7 @@ struct Schema : NIceDb::Schema { >; }; - struct ActiveDataErasureShards : Table<118> { + struct WaitingDataErasureShards : Table<118> { struct OwnerShardIdx : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct LocalShardIdx : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; }; struct Status : Column<3, NScheme::NTypeIds::Uint32> {}; @@ -2096,11 +2096,11 @@ struct Schema : NIceDb::Schema { ResourcePool, BackupCollection, KMeansTreeProgress, - KMeansTreeSample - DataErasureStarts, - ActiveDataErasureTenants, - TenantDataErasureStarts, - ActiveDataErasureShards + KMeansTreeSample, + DataErasureGenerations, + WaitingDataErasureTenants, + TenantDataErasureGenerations, + WaitingDataErasureShards >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp b/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp index a2e26cad6f57..0a6f87d978d9 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp +++ b/ydb/core/tx/schemeshard/ut_data_erasure/ut_data_erasure.cpp @@ -3,6 +3,7 @@ #include #include #include +#include using namespace NKikimr; using namespace NSchemeShardUT_Private; @@ -57,6 +58,71 @@ ui64 CreateTestSubdomain( return schemeshardId; } +class TFakeBSController : public TActor, public NTabletFlatExecutor::TTabletExecutedFlat { + void DefaultSignalTabletActive(const TActorContext &) override + { + // must be empty + } + + void OnActivateExecutor(const TActorContext &) override + { + Become(&TThis::StateWork); + SignalTabletActive(SelfId()); + } + + void OnDetach(const TActorContext &ctx) override + { + Die(ctx); + } + + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &, const TActorContext &ctx) override + { + Die(ctx); + } + +public: + TFakeBSController(const TActorId &tablet, TTabletStorageInfo *info) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, nullptr) + { + } + + STFUNC(StateInit) + { + StateInitImpl(ev, SelfId()); + } + + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvBlobStorage::TEvControllerShredRequest, Handle); + } + } + + void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr& ev, const TActorContext& ctx) { + auto record = ev->Get()->Record; + if (record.GetGeneration() > Generation) { + Generation = record.GetGeneration(); + Completed = false; + Progress = 0; + } else if (record.GetGeneration() == Generation) { + if (!Completed) { + Progress += 5000; + if (Progress >= 10000) { + Progress = 10000; + Completed = true; + } + } + } + ctx.Send(ev->Sender, new TEvBlobStorage::TEvControllerShredResponse(Generation, Completed, Progress)); + } + +public: + ui64 Generation = 0; + bool Completed = true; + ui32 Progress = 10000; +}; + } // namespace Y_UNIT_TEST_SUITE(TestDataErasure) { @@ -67,12 +133,53 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); - CreateTestBootstrapper(runtime, CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController), - &CreateFlatBsController); + auto info = CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController); + CreateTestBootstrapper(runtime, info, [](const TActorId &tablet, TTabletStorageInfo *info) -> IActor* { + return new TFakeBSController(tablet, info); + }); + + runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); + auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; + dataErasureConfig.SetDataErasureIntervalSeconds(3); + dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + + auto sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + ui64 txId = 100; + + CreateTestSubdomain(runtime, env, &txId, "Database1"); + CreateTestSubdomain(runtime, env, &txId, "Database2"); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); + runtime.DispatchEvents(options); + + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto response = runtime.GrabEdgeEventRethrow(handle); + + UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + } + + Y_UNIT_TEST(DataErasureRun3Cycles) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto info = CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController); + CreateTestBootstrapper(runtime, info, [](const TActorId &tablet, TTabletStorageInfo *info) -> IActor* { + return new TFakeBSController(tablet, info); + }); runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; - dataErasureConfig.SetDataErasureIntervalSeconds(2); + dataErasureConfig.SetDataErasureIntervalSeconds(3); dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); auto sender = runtime.AllocateEdgeActor(); @@ -83,7 +190,9 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { CreateTestSubdomain(runtime, env, &txId, "Database1"); CreateTestSubdomain(runtime, env, &txId, "Database2"); - env.SimulateSleep(runtime, TDuration::Seconds(10)); + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 9)); + runtime.DispatchEvents(options); auto request = MakeHolder(); runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); @@ -91,7 +200,102 @@ Y_UNIT_TEST_SUITE(TestDataErasure) { TAutoPtr handle; auto response = runtime.GrabEdgeEventRethrow(handle); - UNIT_ASSERT_EQUAL(response->Record.GetGeneration(), 1); - UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::IN_PROGRESS_BSC); + UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 3, response->Record.GetGeneration()); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + } + + Y_UNIT_TEST(DataErasureManualLaunch) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto info = CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController); + CreateTestBootstrapper(runtime, info, [](const TActorId &tablet, TTabletStorageInfo *info) -> IActor* { + return new TFakeBSController(tablet, info); + }); + + runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); + auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; + dataErasureConfig.SetForceManualStartup(true); + dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + + auto sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + ui64 txId = 100; + + CreateTestSubdomain(runtime, env, &txId, "Database1"); + CreateTestSubdomain(runtime, env, &txId, "Database2"); + + { + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + } + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); + runtime.DispatchEvents(options); + + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto response = runtime.GrabEdgeEventRethrow(handle); + + UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + } + + Y_UNIT_TEST(DataErasureManualLaunch3Cycles) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto info = CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController); + CreateTestBootstrapper(runtime, info, [](const TActorId &tablet, TTabletStorageInfo *info) -> IActor* { + return new TFakeBSController(tablet, info); + }); + + runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); + auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; + dataErasureConfig.SetForceManualStartup(true); + dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + + auto sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + ui64 txId = 100; + + CreateTestSubdomain(runtime, env, &txId, "Database1"); + CreateTestSubdomain(runtime, env, &txId, "Database2"); + + auto RunDataErasure = [&runtime] (ui32 expectedGeneration) { + auto sender = runtime.AllocateEdgeActor(); + { + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + } + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); + runtime.DispatchEvents(options); + + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto response = runtime.GrabEdgeEventRethrow(handle); + + UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), expectedGeneration, response->Record.GetGeneration()); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + }; + + RunDataErasure(1); + RunDataErasure(2); + RunDataErasure(3); } } diff --git a/ydb/core/tx/schemeshard/ut_data_erasure/ya.make b/ydb/core/tx/schemeshard/ut_data_erasure/ya.make index 26656e5d129a..1eee45270fb0 100644 --- a/ydb/core/tx/schemeshard/ut_data_erasure/ya.make +++ b/ydb/core/tx/schemeshard/ut_data_erasure/ya.make @@ -4,6 +4,8 @@ FORK_SUBTESTS() SPLIT_FACTOR(10) +TIMEOUT(20) + IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) SIZE(LARGE) TAG(ya:fat) diff --git a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ut_data_erasure_reboots.cpp b/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ut_data_erasure_reboots.cpp new file mode 100644 index 000000000000..e6255cd7a727 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ut_data_erasure_reboots.cpp @@ -0,0 +1,178 @@ +#include +#include +#include +#include +#include +#include + +using namespace NKikimr; +using namespace NSchemeShard; +using namespace NSchemeShardUT_Private; + +namespace { + + ui64 CreateTestSubdomain( + TTestActorRuntime& runtime, + TTestEnv& env, + ui64* txId, + const TString& name) { + TestCreateExtSubDomain(runtime, ++(*txId), "/MyRoot", Sprintf(R"( + Name: "%s" + )", name.c_str())); + env.TestWaitNotification(runtime, *txId); + + TestAlterExtSubDomain(runtime, ++(*txId), "/MyRoot", Sprintf(R"( + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + ExternalSchemeShard: true + ExternalHive: false + Name: "%s" + StoragePools { + Name: "name_%s_kind_hdd-1" + Kind: "common" + } + StoragePools { + Name: "name_%s_kind_hdd-2" + Kind: "external" + } + )", name.c_str(), name.c_str(), name.c_str())); + env.TestWaitNotification(runtime, *txId); + + ui64 schemeshardId; + TestDescribeResult(DescribePath(runtime, TStringBuilder() << "/MyRoot/" << name), { + NLs::PathExist, + NLs::ExtractTenantSchemeshard(&schemeshardId) + }); + + TestCreateTable(runtime, schemeshardId, ++(*txId), TStringBuilder() << "/MyRoot/" << name, + R"____( + Name: "Simple" + Columns { Name: "key1" Type: "Uint32"} + Columns { Name: "Value" Type: "Utf8"} + KeyColumnNames: ["key1"] + UniformPartitionsCount: 2 + )____"); + env.TestWaitNotification(runtime, *txId, schemeshardId); + + return schemeshardId; + } + + class TFakeBSController : public TActor, public NTabletFlatExecutor::TTabletExecutedFlat { + void DefaultSignalTabletActive(const TActorContext &) override + { + // must be empty + } + + void OnActivateExecutor(const TActorContext &) override + { + Become(&TThis::StateWork); + SignalTabletActive(SelfId()); + } + + void OnDetach(const TActorContext &ctx) override + { + Die(ctx); + } + + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &, const TActorContext &ctx) override + { + Die(ctx); + } + + public: + TFakeBSController(const TActorId &tablet, TTabletStorageInfo *info) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, nullptr) + { + } + + STFUNC(StateInit) + { + StateInitImpl(ev, SelfId()); + } + + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvBlobStorage::TEvControllerShredRequest, Handle); + } + } + + void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr& ev, const TActorContext& ctx) { + auto record = ev->Get()->Record; + if (record.GetGeneration() > Generation) { + Generation = record.GetGeneration(); + Completed = false; + Progress = 0; + } else if (record.GetGeneration() == Generation) { + if (!Completed) { + Progress += 5000; + if (Progress >= 10000) { + Progress = 10000; + Completed = true; + } + } + } + ctx.Send(ev->Sender, new TEvBlobStorage::TEvControllerShredResponse(Generation, Completed, Progress)); + } + + public: + ui64 Generation = 0; + bool Completed = true; + ui32 Progress = 10000; + }; + + } // namespace + +Y_UNIT_TEST_SUITE(DataErasureReboots) { + Y_UNIT_TEST(Fake) { + } + + Y_UNIT_TEST(SimpleDataErasureTest) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto info = CreateTestTabletInfo(MakeBSControllerID(), TTabletTypes::BSController); + CreateTestBootstrapper(runtime, info, [](const TActorId &tablet, TTabletStorageInfo *info) -> IActor* { + return new TFakeBSController(tablet, info); + }); + + runtime.GetAppData().FeatureFlags.SetEnableDataErasure(true); + auto& dataErasureConfig = runtime.GetAppData().DataErasureConfig; + dataErasureConfig.SetForceManualStartup(true); + dataErasureConfig.SetBlobStorageControllerRequestIntervalSeconds(1); + + ui64 txId = 100; + + auto tenantSS = CreateTestSubdomain(runtime, *(t.TestEnv), &txId, "Database1"); + auto sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + RebootTablet(runtime, tenantSS, sender); + + { + TInactiveZone inactive(activeZone); + { + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + } + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvControllerShredResponse, 3)); + runtime.DispatchEvents(options); + + auto request = MakeHolder(); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto response = runtime.GrabEdgeEventRethrow(handle); + + UNIT_ASSERT_EQUAL_C(response->Record.GetGeneration(), 1, response->Record.GetGeneration()); + UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NKikimrScheme::TEvDataErasureInfoResponse::COMPLETED); + } + }); + } +} diff --git a/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ya.make b/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ya.make new file mode 100644 index 000000000000..3a823fc34407 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_data_erasure_reboots/ya.make @@ -0,0 +1,28 @@ +UNITTEST_FOR(ydb/core/tx/schemeshard) + +FORK_SUBTESTS() + +SPLIT_FACTOR(10) + +TIMEOUT(900) + +SIZE(LARGE) +TAG(ya:fat) + +PEERDIR( + library/cpp/getopt + library/cpp/regex/pcre + ydb/core/cms + ydb/core/testlib/default + ydb/core/tx + ydb/core/tx/schemeshard/ut_helpers + ydb/core/wrappers/ut_helpers +) + +SRCS( + ut_data_erasure_reboots.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 224a1091e85f..48f631457774 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -70,6 +70,7 @@ SRCS( schemeshard__borrowed_compaction.cpp schemeshard__clean_pathes.cpp schemeshard__conditional_erase.cpp + schemeshard__data_erasure_manager.cpp schemeshard__delete_tablet_reply.cpp schemeshard__describe_scheme.cpp schemeshard__find_subdomain_path_id.cpp @@ -200,8 +201,8 @@ SRCS( schemeshard__unmark_restore_tables.cpp schemeshard__upgrade_access_database.cpp schemeshard__upgrade_schema.cpp - schemeshard__data_erasure.cpp - schemeshard__tenant_data_erasure.cpp + schemeshard__root_data_erasure_manager.cpp + schemeshard__tenant_data_erasure_manager.cpp schemeshard_audit_log.cpp schemeshard_audit_log_fragment.cpp schemeshard_backup.cpp @@ -217,7 +218,6 @@ SRCS( schemeshard_build_index_tx_base.cpp schemeshard_cdc_stream_common.cpp schemeshard_cdc_stream_scan.cpp - schemeshard_data_erasure_scheduler.cpp schemeshard_domain_links.cpp schemeshard_domain_links.h schemeshard_effective_acl.cpp diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 95289c25a1a1..d87592875f8d 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -8150,7 +8150,7 @@ }, { "TableId": 115, - "TableName": "DataErasureScheduler", + "TableName": "DataErasureGenerations", "TableKey": [ 1 ], @@ -8200,7 +8200,7 @@ }, { "TableId": 116, - "TableName": "ActiveDataErasureTenants", + "TableName": "WaitingDataErasureTenants", "TableKey": [ 1, 2 @@ -8251,7 +8251,7 @@ }, { "TableId": 117, - "TableName": "TenantDataErasure", + "TableName": "TenantDataErasureGenerations", "TableKey": [ 1 ], @@ -8295,7 +8295,7 @@ }, { "TableId": 118, - "TableName": "ActiveDataErasureShards", + "TableName": "WaitingDataErasureShards", "TableKey": [ 1, 2 @@ -8344,4 +8344,4 @@ } } } -] +] \ No newline at end of file