Skip to content

Commit

Permalink
Merge 11a8c5f into 6ba26f5
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 2, 2025
2 parents 6ba26f5 + 11a8c5f commit 919c6d6
Show file tree
Hide file tree
Showing 19 changed files with 337 additions and 189 deletions.
11 changes: 1 addition & 10 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
TInstant start = Now();
ui32 compactionsStart = csController->GetCompactionStartedCounter().Val();
while (Now() - start < TDuration::Seconds(10)) {
if (compactionsStart != csController->GetCompactionStartedCounter().Val()) {
compactionsStart = csController->GetCompactionStartedCounter().Val();
start = Now();
}
Cerr << "WAIT_COMPACTION: " << csController->GetCompactionStartedCounter().Val() << Endl;
Sleep(TDuration::Seconds(1));
}
csController->WaitCompactions(TDuration::Seconds(5));
// important checker for control compactions (<=21) and control indexes constructed (>=21)
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val());

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class TChangesWithAppend;
class TCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
class TStoragesManager;
class TRemovePortionsChange;
class TMovePortionsChange;

namespace NReader {
class TTxScan;
Expand Down Expand Up @@ -206,6 +208,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;
friend class NOlap::TRemovePortionsChange;
friend class NOlap::TMovePortionsChange;

class TStoragesManager;
friend class TTxController;
Expand Down
55 changes: 27 additions & 28 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
#pragma once
#include "settings.h"
#include <ydb/core/protos/counters_columnshard.pb.h>

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/action.h>
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/data_locks/locks/abstract.h>
#include <ydb/core/tx/columnshard/data_locks/locks/composite.h>
#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>

#include <util/generic/string.h>
#include <util/datetime/base.h>
#include <util/stream/str.h>
#include <util/generic/guid.h>
#include <util/generic/string.h>
#include <util/stream/str.h>

#include <compare>

namespace NKikimr::NTabletFlatExecutor {
Expand All @@ -30,7 +31,7 @@ class TTransactionContext;
namespace NKikimr::NColumnShard {
class TBlobManagerDb;
class TColumnShard;
}
} // namespace NKikimr::NColumnShard

namespace NKikimr::NOlap {
class TColumnEngineForLogs;
Expand All @@ -43,12 +44,13 @@ class TPortionEvictionFeatures {
std::optional<TString> TargetTierName;
const TString CurrentTierName;
std::optional<NActualizer::TRWAddress> RWAddress;

public:
TPortionEvictionFeatures(const std::shared_ptr<ISnapshotSchema>& currentScheme, const std::shared_ptr<ISnapshotSchema>& targetScheme, const TString& currentTierName)
TPortionEvictionFeatures(const std::shared_ptr<ISnapshotSchema>& currentScheme, const std::shared_ptr<ISnapshotSchema>& targetScheme,
const TString& currentTierName)
: CurrentScheme(currentScheme)
, TargetScheme(targetScheme)
, CurrentTierName(currentTierName)
{
, CurrentTierName(currentTierName) {
AFL_VERIFY(CurrentTierName);
}

Expand Down Expand Up @@ -81,7 +83,8 @@ class TPortionEvictionFeatures {
NActualizer::TRWAddress GetRWAddress() {
if (!RWAddress) {
AFL_VERIFY(TargetTierName);
RWAddress = NActualizer::TRWAddress(CurrentScheme->GetIndexInfo().GetUsedStorageIds(CurrentTierName), TargetScheme->GetIndexInfo().GetUsedStorageIds(*TargetTierName));
RWAddress = NActualizer::TRWAddress(CurrentScheme->GetIndexInfo().GetUsedStorageIds(CurrentTierName),
TargetScheme->GetIndexInfo().GetUsedStorageIds(*TargetTierName));
}
return *RWAddress;
}
Expand All @@ -106,12 +109,12 @@ class TFinalizationContext: TNonCopyable {
ui64* LastGranuleId;
ui64* LastPortionId;
const TSnapshot Snapshot;

public:
TFinalizationContext(ui64& lastGranuleId, ui64& lastPortionId, const TSnapshot& snapshot)
: LastGranuleId(&lastGranuleId)
, LastPortionId(&lastPortionId)
, Snapshot(snapshot) {

}

ui64 NextGranuleId() {
Expand Down Expand Up @@ -141,7 +144,6 @@ class TChangesFinishContext {
TChangesFinishContext(const TString& errorMessage)
: FinishedSuccessfully(false)
, ErrorMessage(errorMessage) {

}

TChangesFinishContext() = default;
Expand All @@ -150,6 +152,7 @@ class TChangesFinishContext {
class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext {
private:
using TBase = TChangesFinishContext;

public:
const TActorContext& ActorContext;
const ui32 BlobsWritten;
Expand All @@ -164,9 +167,7 @@ class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext {
, BytesWritten(bytesWritten)
, Duration(d)
, EngineLogs(engineLogs)
, Snapshot(snapshot)
{

, Snapshot(snapshot) {
}
};

Expand All @@ -176,12 +177,11 @@ class TConstructionContext: TNonCopyable {
const NColumnShard::TIndexationCounters Counters;
const NOlap::TSnapshot LastCommittedTx;

TConstructionContext(const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters& counters, const NOlap::TSnapshot& lastCommittedTx)
TConstructionContext(
const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters& counters, const NOlap::TSnapshot& lastCommittedTx)
: SchemaVersions(schemaVersions)
, Counters(counters)
, LastCommittedTx(lastCommittedTx)
{

, LastCommittedTx(lastCommittedTx) {
}
};

Expand All @@ -200,7 +200,7 @@ class TDataAccessorsInitializationContext {

class TColumnEngineChanges {
public:
enum class EStage: ui32 {
enum class EStage : ui32 {
Created = 0,
Started,
Constructed,
Expand All @@ -209,6 +209,7 @@ class TColumnEngineChanges {
Finished,
Aborted
};

private:
EStage Stage = EStage::Created;
std::shared_ptr<NDataLocks::TManager::TGuard> LockGuard;
Expand All @@ -221,7 +222,8 @@ class TColumnEngineChanges {
virtual NDataLocks::ELockCategory GetLockCategory() const = 0;
virtual void DoDebugString(TStringOutput& out) const = 0;
virtual void DoCompile(TFinalizationContext& context) = 0;
virtual void DoOnAfterCompile() {}
virtual void DoOnAfterCompile() {
}
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
Expand Down Expand Up @@ -310,9 +312,7 @@ class TColumnEngineChanges {
}

TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager, const NBlobOperations::EConsumer consumerId)
: BlobsAction(storagesManager, consumerId)
{

: BlobsAction(storagesManager, consumerId) {
}

TConclusionStatus ConstructBlobs(TConstructionContext& context) noexcept;
Expand Down Expand Up @@ -342,7 +342,7 @@ class TColumnEngineChanges {

std::vector<std::shared_ptr<IBlobsReadingAction>> GetReadingActions() const {
auto result = BlobsAction.GetReadingActions();
// Y_ABORT_UNLESS(result.size());
// Y_ABORT_UNLESS(result.size());
return result;
}
virtual TString TypeString() const = 0;
Expand All @@ -351,7 +351,6 @@ class TColumnEngineChanges {
ui64 TotalBlobsSize() const {
return Blobs.GetTotalBlobsSize();
}

};

}
} // namespace NKikimr::NOlap
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/changes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "changes.h"

namespace NKikimr::NOlap {

}
37 changes: 37 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/changes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once
#include "abstract.h"

#include <ydb/core/tx/columnshard/data_locks/locks/abstract.h>

namespace NKikimr::NColumnShard {
class TColumnShard;
}

namespace NKikimr::NOlap {

class IChangeAction {
private:
virtual std::shared_ptr<NDataLocks::ILock> DoBuildDataLock(const TString& id, const NDataLocks::ELockCategory lockCategory) const = 0;
virtual void DoApplyOnExecute(
NColumnShard::TColumnShard* self, TWriteIndexContext& context, const TDataAccessorsResult& fetchedDataAccessors) = 0;
virtual void DoApplyOnComplete(
NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context, const TDataAccessorsResult& fetchedDataAccessors) = 0;

public:
IChangeAction() = default;

std::shared_ptr<NDataLocks::ILock> BuildDataLock(const TString& id, const NDataLocks::ELockCategory lockCategory) const {
return DoBuildDataLock(id, lockCategory);
}

void ApplyOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context, const TDataAccessorsResult& fetchedDataAccessors) {
return DoApplyOnExecute(self, context, fetchedDataAccessors);
}

void ApplyOnComplete(
NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context, const TDataAccessorsResult& fetchedDataAccessors) {
return DoApplyOnComplete(self, context, fetchedDataAccessors);
}
};

} // namespace NKikimr::NOlap
48 changes: 48 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/move_portions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "move_portions.h"

#include <ydb/core/tx/columnshard/counters/portions.h>
#include <ydb/core/tx/columnshard/engines/changes/counters/general.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

namespace NKikimr::NOlap {

std::shared_ptr<NDataLocks::ILock> TMovePortionsChange::DoBuildDataLock(const TString& id, const NDataLocks::ELockCategory lockCategory) const {
THashSet<TPortionAddress> portions;
for (auto&& i : Portions) {
AFL_VERIFY(portions.emplace(i.first).second);
}
return std::make_shared<NDataLocks::TListPortionsLock>(id, portions, lockCategory);
}

void TMovePortionsChange::DoApplyOnExecute(
NColumnShard::TColumnShard* self, TWriteIndexContext& context, const TDataAccessorsResult& fetchedDataAccessor) {
auto schemaPtr = context.EngineLogs.GetVersionedIndex().GetLastSchema();
for (auto&& [_, i] : Portions) {
const auto pred = [&](TPortionInfo& portionCopy) {
portionCopy.MutableMeta().ResetCompactionLevel(TargetCompactionLevel.value_or(0));
};
context.EngineLogs.GetGranuleVerified(i->GetPathId())
.ModifyPortionOnExecute(context.DBWrapper, fetchedDataAccessor.GetPortionAccessorVerified(i->GetPortionId()), pred,
schemaPtr->GetIndexInfo().GetPKFirstColumnId());
}
}

void TMovePortionsChange::DoApplyOnComplete(
NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context, const TDataAccessorsResult& /*fetchedDataAccessor*/) {
if (!Portions.size()) {
return;
}
THashMap<ui32, TSimplePortionsGroupInfo> portionGroups;
for (auto&& [_, i] : Portions) {
portionGroups[i->GetMeta().GetCompactionLevel()].AddPortion(i);
}
NChanges::TGeneralCompactionCounters::OnMovePortionsByLevel(portionGroups, TargetCompactionLevel.value_or(0));
for (auto&& [_, i] : Portions) {
const auto pred = [&](const std::shared_ptr<TPortionInfo>& portion) {
portion->MutableMeta().ResetCompactionLevel(TargetCompactionLevel.value_or(0));
};
context.EngineLogs.ModifyPortionOnComplete(i, pred);
}
}

} // namespace NKikimr::NOlap
44 changes: 44 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/move_portions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once
#include "changes.h"

#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>

namespace NKikimr::NOlap {

class TMovePortionsChange: public IChangeAction {
private:
THashMap<TPortionAddress, std::shared_ptr<const TPortionInfo>> Portions;
YDB_READONLY_DEF(std::optional<ui64>, TargetCompactionLevel);

virtual std::shared_ptr<NDataLocks::ILock> DoBuildDataLock(const TString& id, const NDataLocks::ELockCategory lockCategory) const override;
virtual void DoApplyOnExecute(
NColumnShard::TColumnShard* self, TWriteIndexContext& context, const TDataAccessorsResult& fetchedDataAccessor) override;
virtual void DoApplyOnComplete(
NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context, const TDataAccessorsResult& fetchedDataAccessor) override;

public:
const THashMap<TPortionAddress, TPortionInfo::TConstPtr>& GetPortionsToRemove() const {
return Portions;
}

ui32 GetSize() const {
return Portions.size();
}

bool HasPortions() const {
return Portions.size();
}

void AddPortions(const std::vector<std::shared_ptr<TPortionInfo>>& portions) {
for (auto&& i : portions) {
AFL_VERIFY(i);
AFL_VERIFY(Portions.emplace(i->GetAddress(), i).second)("portion_id", i->GetPortionId());
}
}

void SetTargetCompactionLevel(const ui64 level) {
TargetCompactionLevel = level;
}
};

} // namespace NKikimr::NOlap
Loading

0 comments on commit 919c6d6

Please sign in to comment.