Skip to content

Commit

Permalink
split methods for stages execute/complete usage in case modification (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Apr 16, 2024
1 parent 06ee677 commit d30c49e
Show file tree
Hide file tree
Showing 24 changed files with 160 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
auto storage = Self->StoragesManager->GetInsertOperator();
BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP");
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction);
Self->InsertTable->EraseAbortedOnExecute(dbTable, abortedData, BlobsAction);
}
BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
return true;
}
void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) {
auto allAborted = Self->InsertTable->GetAborted();
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAbortedOnComplete(abortedData);
}

Y_ABORT_UNLESS(BlobsAction);
BlobsAction->OnCompleteTxAfterRemoving(*Self, true);
Self->EnqueueBackgroundActivities();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "tx_write_index.h"
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

namespace NKikimr::NColumnShard {
Expand All @@ -22,7 +23,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot));
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(txc, dbWrap);
changes->WriteIndex(*Self, context);
changes->WriteIndexOnExecute(*Self, context);

changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, *context.BlobManagerDb, true);

Expand Down Expand Up @@ -55,8 +56,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
const ui64 bytesWritten = changes->GetBlobsAction().GetWritingTotalSize();

if (!Ev->Get()->IndexChanges->IsAborted()) {
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity);
Ev->Get()->IndexChanges->WriteIndexComplete(*Self, context);
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
Ev->Get()->IndexChanges->WriteIndexOnComplete(*Self, context);
}

if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ class TColumnShard
return TablesManager.GetPrimaryIndexAsVerified<T>();
}

template <class T>
T& MutableIndexAs() {
return TablesManager.MutablePrimaryIndexAsVerified<T>();
}

bool HasIndex() const {
return !!TablesManager.GetPrimaryIndex();
}
Expand Down
21 changes: 6 additions & 15 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,20 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con
return result;
}

bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
Y_ABORT_UNLESS(Stage == EStage::Compiled);
Y_ABORT_UNLESS(DoApplyChanges(self, context));
Stage = EStage::Applied;
return true;
}

void TColumnEngineChanges::WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
Y_ABORT_UNLESS(Stage != EStage::Aborted);
if ((ui32)Stage >= (ui32)EStage::Written) {
return;
}
Y_ABORT_UNLESS(Stage == EStage::Applied);
Y_ABORT_UNLESS(Stage <= EStage::Written);
Y_ABORT_UNLESS(Stage >= EStage::Compiled);

DoWriteIndex(self, context);
DoWriteIndexOnExecute(self, context);
Stage = EStage::Written;
}

void TColumnEngineChanges::WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
Y_ABORT_UNLESS(Stage == EStage::Written);
Stage = EStage::Finished;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
DoWriteIndexComplete(self, context);
DoWriteIndexOnComplete(self, context);
OnFinish(self, context);
self.IncCounter(GetCounterIndex(context.FinishedSuccessfully));

Expand Down
26 changes: 7 additions & 19 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,20 @@ class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext {
const ui64 BytesWritten;
const TDuration Duration;
NColumnShard::TBackgroundActivity& TriggerActivity;
TColumnEngineForLogs& EngineLogs;
TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten
, const TDuration d, NColumnShard::TBackgroundActivity& triggerActivity)
, const TDuration d, NColumnShard::TBackgroundActivity& triggerActivity, TColumnEngineForLogs& engineLogs)
: ActorContext(actorContext)
, BlobsWritten(blobsWritten)
, BytesWritten(bytesWritten)
, Duration(d)
, TriggerActivity(triggerActivity)
, EngineLogs(engineLogs)
{

}
};

class TApplyChangesContext: TNonCopyable {
public:
IDbWrapper& DB;
const TSnapshot Snapshot;
TApplyChangesContext(IDbWrapper& db, const TSnapshot& snapshot)
: DB(db)
, Snapshot(snapshot) {

}
};

class TConstructionContext: TNonCopyable {
public:
const TVersionedIndex& SchemaVersions;
Expand All @@ -146,7 +137,6 @@ class TColumnEngineChanges {
Started,
Constructed,
Compiled,
Applied,
Written,
Finished,
Aborted
Expand All @@ -156,10 +146,9 @@ class TColumnEngineChanges {
protected:
virtual void DoDebugString(TStringOutput& out) const = 0;
virtual void DoCompile(TFinalizationContext& context) = 0;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) = 0;
virtual bool NeedConstruction() const {
return true;
}
Expand Down Expand Up @@ -223,14 +212,13 @@ class TColumnEngineChanges {

void Abort(NColumnShard::TColumnShard& self, TChangesFinishContext& context);
void Start(NColumnShard::TColumnShard& self);
bool ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context);

virtual ui32 GetWritePortionsCount() const = 0;
virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
virtual bool NeedWritePortion(const ui32 index) const = 0;

void WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context);
void WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context);
void WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context);
void WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context);

void Compile(TFinalizationContext& context) noexcept;

Expand Down
17 changes: 6 additions & 11 deletions ydb/core/tx/columnshard/engines/changes/cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
}
}

void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
THashSet<ui64> pathIds;
for (auto&& p : PortionsToDrop) {
p.RemoveFromDatabase(context.DBWrapper);

auto removing = BlobsAction.GetRemoving(p);
for (auto&& r : p.Records) {
removing->DeclareRemove((TTabletId)self.TabletID(), r.BlobRange.BlobId);
Expand All @@ -31,26 +33,19 @@ void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self,
}
}

bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!self.ErasePortion(portionInfo)) {
if (!context.EngineLogs.ErasePortion(portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
continue;
}
portionInfo.RemoveFromDatabase(context.DB);
}

return true;
context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None();
}

void TCleanupColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
self.BackgroundController.StartCleanup();
}

void TCleanupColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& context) {
context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None();
}

void TCleanupColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
self.BackgroundController.FinishCleanup();
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges {
THashMap<TString, THashSet<NOlap::TEvictedBlob>> BlobsToForget;
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
protected:
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) override;
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual void DoCompile(TFinalizationContext& /*context*/) override {
}
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& /*context*/) noexcept override {
Expand Down
12 changes: 2 additions & 10 deletions ydb/core/tx/columnshard/engines/changes/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
}
}

bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
return TBase::DoApplyChanges(self, context);
}

void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndex(self, context);
}

void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);

Expand All @@ -52,8 +44,8 @@ void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
GranuleMeta->OnCompactionStarted();
}

void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexComplete(self, context);
void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
}

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ class TCompactColumnEngineChanges: public TChangesWithAppend {
protected:
std::shared_ptr<TGranuleMeta> GranuleMeta;

virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) override;
virtual TPortionMeta::EProduced GetResultProducedClass() const = 0;
virtual void OnAbortEmergency() override {
NeedGranuleStatusProvide = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
return TConclusionStatus::Success();
}

void TGeneralCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexComplete(self, context);
void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NCompaction {
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept;
void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept;
Expand Down
25 changes: 11 additions & 14 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,11 @@

namespace NKikimr::NOlap {

bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
if (!TBase::DoApplyChanges(self, context)) {
return false;
}
return true;
}

void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndex(self, context);
void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndexOnExecute(self, context);
auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId);
for (const auto& insertedData : DataToIndex) {
self.InsertTable->EraseCommitted(context.DBWrapper, insertedData, removing);
}
if (!DataToIndex.empty()) {
self.UpdateInsertTableCounters();
self.InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing);
}
}

Expand All @@ -36,7 +26,14 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
self.BackgroundController.StartIndexing(*this);
}

void TInsertColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
for (const auto& insertedData : DataToIndex) {
self.InsertTable->EraseCommittedOnComplete(insertedData);
}
if (!DataToIndex.empty()) {
self.UpdateInsertTableCounters();
}
self.IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten);
self.IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten);
self.IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class TInsertColumnEngineChanges: public TChangesWithAppend {
const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
std::vector<NOlap::TInsertedData> DataToIndex;
protected:
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
virtual ui64 DoCalcMemoryForUsage() const override {
Expand Down
Loading

0 comments on commit d30c49e

Please sign in to comment.