Skip to content

Commit

Permalink
Merge de04db2 into c713cd6
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Feb 23, 2024
2 parents c713cd6 + de04db2 commit 6b61d54
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot));
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(txc, dbWrap);
changes->WriteIndexOnExecute(*Self, context);
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
changes->WriteIndexOnExecute(Self, context);

changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, *context.BlobManagerDb, true);
NOlap::TBlobManagerDb blobManagerDb(txc.DB);
changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, blobManagerDb, true);

Self->UpdateIndexCounters();
} else {
Expand Down Expand Up @@ -57,7 +58,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {

if (!Ev->Get()->IndexChanges->IsAborted()) {
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
Ev->Get()->IndexChanges->WriteIndexOnComplete(*Self, context);
Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context);
}

if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) {
Expand Down
18 changes: 10 additions & 8 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con
return result;
}

void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
Y_ABORT_UNLESS(Stage != EStage::Aborted);
Y_ABORT_UNLESS(Stage <= EStage::Written);
Y_ABORT_UNLESS(Stage >= EStage::Compiled);
Expand All @@ -44,13 +44,15 @@ void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self,
Stage = EStage::Written;
}

void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
Y_ABORT_UNLESS(Stage == EStage::Written);
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
Y_ABORT_UNLESS(Stage == EStage::Written || !self);
Stage = EStage::Finished;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
DoWriteIndexOnComplete(self, context);
OnFinish(self, context);
self.IncCounter(GetCounterIndex(context.FinishedSuccessfully));
if (self) {
OnFinish(*self, context);
self->IncCounter(GetCounterIndex(context.FinishedSuccessfully));
}

}

Expand Down Expand Up @@ -105,10 +107,10 @@ void TColumnEngineChanges::OnFinish(NColumnShard::TColumnShard& self, TChangesFi
DoOnFinish(self, context);
}

TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper)
: Txc(txc)
, BlobManagerDb(std::make_shared<TBlobManagerDb>(txc.DB))
TWriteIndexContext::TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs)
: DB(db)
, DBWrapper(dbWrapper)
, EngineLogs(engineLogs)
{

}
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class TFinalizationContext: TNonCopyable {

class TWriteIndexContext: TNonCopyable {
public:
NTabletFlatExecutor::TTransactionContext& Txc;
std::shared_ptr<TBlobManagerDb> BlobManagerDb;
NTable::TDatabase* DB;
IDbWrapper& DBWrapper;
TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper);
TColumnEngineForLogs& EngineLogs;
TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs);
};

class TChangesFinishContext {
Expand Down Expand Up @@ -146,8 +146,8 @@ class TColumnEngineChanges {
protected:
virtual void DoDebugString(TStringOutput& out) const = 0;
virtual void DoCompile(TFinalizationContext& context) = 0;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
virtual bool NeedConstruction() const {
return true;
Expand Down Expand Up @@ -217,8 +217,8 @@ class TColumnEngineChanges {
virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
virtual bool NeedWritePortion(const ui32 index) const = 0;

void WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context);
void WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context);
void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context);
void WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context);

void Compile(TFinalizationContext& context) noexcept;

Expand Down
34 changes: 21 additions & 13 deletions ydb/core/tx/columnshard/engines/changes/cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,39 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
}
}

void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
THashSet<ui64> pathIds;
for (auto&& p : PortionsToDrop) {
p.RemoveFromDatabase(context.DBWrapper);
if (self) {
for (auto&& p : PortionsToDrop) {
p.RemoveFromDatabase(context.DBWrapper);

auto removing = BlobsAction.GetRemoving(p);
for (auto&& r : p.Records) {
removing->DeclareRemove((TTabletId)self.TabletID(), r.BlobRange.BlobId);
auto removing = BlobsAction.GetRemoving(p);
for (auto&& r : p.Records) {
removing->DeclareRemove((TTabletId)self->TabletID(), r.BlobRange.BlobId);
}
pathIds.emplace(p.GetPathId());
}
if (context.DB) {
for (auto&& p : pathIds) {
self->TablesManager.TryFinalizeDropPath(*context.DB, p);
}
}
pathIds.emplace(p.GetPathId());
self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
}
for (auto&& p: pathIds) {
self.TablesManager.TryFinalizeDropPath(context.Txc, p);
}
}

void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& context) {
void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
}
}
context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None();
if (self) {
self->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
}
}
}

void TCleanupColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges {
THashMap<TString, THashSet<NOlap::TEvictedBlob>> BlobsToForget;
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
protected:
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
GranuleMeta->OnCompactionStarted();
}

void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
if (self) {
self->IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
}
}

void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TCompactColumnEngineChanges: public TChangesWithAppend {
protected:
std::shared_ptr<TGranuleMeta> GranuleMeta;

virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,13 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
return TConclusionStatus::Success();
}

void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
if (self) {
self->IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
}
}

void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NCompaction {
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept;
void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept;
Expand Down
30 changes: 17 additions & 13 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

namespace NKikimr::NOlap {

void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
TBase::DoWriteIndexOnExecute(self, context);
auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId);
for (const auto& insertedData : DataToIndex) {
self.InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing);
if (self) {
auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId);
for (const auto& insertedData : DataToIndex) {
self->InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing);
}
}
}

Expand All @@ -26,17 +28,19 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
self.BackgroundController.StartIndexing(*this);
}

void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
TBase::DoWriteIndexOnComplete(self, context);
for (const auto& insertedData : DataToIndex) {
self.InsertTable->EraseCommittedOnComplete(insertedData);
}
if (!DataToIndex.empty()) {
self.UpdateInsertTableCounters();
if (self) {
for (const auto& insertedData : DataToIndex) {
self->InsertTable->EraseCommittedOnComplete(insertedData);
}
if (!DataToIndex.empty()) {
self->UpdateInsertTableCounters();
}
self->IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten);
self->IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten);
self->IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds());
}
self.IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten);
self.IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten);
self.IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds());
}

void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class TInsertColumnEngineChanges: public TChangesWithAppend {
const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
std::vector<NOlap::TInsertedData> DataToIndex;
protected:
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;

virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
Expand Down
Loading

0 comments on commit 6b61d54

Please sign in to comment.