Skip to content

Commit

Permalink
fix normalization processing (#11583)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 14, 2024
1 parent 4239317 commit 1c441a9
Show file tree
Hide file tree
Showing 26 changed files with 448 additions and 52 deletions.
24 changes: 16 additions & 8 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
};

bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxUpdateSchema::Execute");
ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString());

while (!Self->NormalizerController.IsNormalizationFinished()) {
Expand All @@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxUpdateSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
if (NormalizerTasks.empty()) {
Expand Down Expand Up @@ -185,32 +187,34 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
}

private:
bool NormalizerFinished = false;
NOlap::INormalizerChanges::TPtr Changes;
};

bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
return false;
}

if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 0) {
NormalizerFinished = true;
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.OnNormalizerFinished(db);
}
return true;
}

void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"event", "TTxApplyNormalizer::Complete");
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")(
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
Changes->ApplyOnComplete(Self->NormalizerController);
Self->NormalizerController.GetNormalizer()->OnResultReady();
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
if (!NormalizerFinished) {
return;
}

Expand Down Expand Up @@ -240,6 +244,8 @@ class TTxInitSchema: public TTransactionBase<TColumnShard> {
};

bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxInitSchema::Execute");
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());

const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
Expand Down Expand Up @@ -286,6 +292,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete");
Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,19 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
TBlobGroupSelector selector(Self->Info());
bool reask = false;
for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
for (auto&& p : i.second) {
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
{
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
}
}
Expand All @@ -1290,17 +1299,18 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
}

for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
std::vector<NOlap::TColumnChunkLoadContextV1> records;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
if (!rowset.Next()) {
return false;
}
Expand All @@ -1321,8 +1331,11 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
"path_id", i.first);
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
return true;
}
Expand Down Expand Up @@ -1439,7 +1452,9 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvTieringModified, Handle);
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
default:
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue");
return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev);
}
}
Expand Down
42 changes: 40 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ struct Schema : NIceDb::Schema {
RepairsTableId,
NormalizersTableId,
NormalizerEventsTableId,
ColumnsV1TableId
ColumnsV1TableId,
ColumnsV2TableId
};

enum class ETierTables: ui32 {
Expand Down Expand Up @@ -569,6 +570,15 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
};

struct IndexColumnsV2: Table<ColumnsV2TableId> {
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
struct Metadata: Column<3, NScheme::NTypeIds::String> {};

using TKey = TableKey<PathId, PortionId>;
using TColumns = TableColumns<PathId, PortionId, Metadata>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down Expand Up @@ -607,7 +617,8 @@ struct Schema : NIceDb::Schema {
TxDependencies,
TxStates,
TxEvents,
IndexColumnsV1
IndexColumnsV1,
IndexColumnsV2
>;

//
Expand Down Expand Up @@ -997,6 +1008,33 @@ class TColumnChunkLoadContextV1 {
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);

public:
TPortionAddress GetPortionAddress() const {
return TPortionAddress(PathId, PortionId);
}

template <class TSource>
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo proto;
proto.SetSSColumnId(Address.GetColumnId());
proto.SetChunkIdx(Address.GetChunkIdx());
*proto.MutableChunkMetadata() = MetaProto;
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
return proto;
}

TFullChunkAddress GetFullChunkAddress() const {
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TActor: public TActorBootstrapped<TActor> {
void Bootstrap();

STFUNC(StateWait) {
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("self_id", SelfId())("tablet_id", TabletId)("parent", Parent);
switch (ev->GetTypeRewrite()) {
cFunc(NActors::TEvents::TEvPoison::EventType, StartStopping);
hFunc(TEvRegisterController, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class TLocalManager: public IDataAccessorsManager {
const std::shared_ptr<IAccessorCallback> AccessorCallback;

virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) override {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
for (auto&& i : request->GetPathIds()) {
auto it = Managers.find(i);
if (it == Managers.end()) {
Expand Down
21 changes: 18 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>

Expand Down Expand Up @@ -61,7 +62,7 @@ class TDataAccessorsResult {
}
};

class IDataAccessorRequestsSubscriber {
class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCounter<IDataAccessorRequestsSubscriber> {
private:
THashSet<ui64> RequestIds;

Expand Down Expand Up @@ -95,7 +96,6 @@ class IDataAccessorRequestsSubscriber {
class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber {
private:
virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override {

}
};

Expand All @@ -117,6 +117,12 @@ class TPathFetchingState {
THashMap<ui64, TPortionDataAccessor> PortionAccessors;

public:
TString DebugString() const {
TStringBuilder sb;
sb << "portions_count=" << Portions.size();
return sb;
}

TPathFetchingState(const ui64 pathId)
: PathId(pathId) {
}
Expand Down Expand Up @@ -161,7 +167,7 @@ class TPathFetchingState {
}
};

class TDataAccessorsRequest {
class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDataAccessorsRequest> {
private:
static inline TAtomicCounter Counter = 0;
ui32 FetchStage = 0;
Expand Down Expand Up @@ -191,6 +197,15 @@ class TDataAccessorsRequest {
}

public:
TString DebugString() const {
TStringBuilder sb;
sb << "request_id=" << RequestId << ";";
for (auto&& i : PathIdStatus) {
sb << i.first << "={" << i.second.DebugString() << "};";
}
return sb;
}

TDataAccessorsRequest() = default;

bool HasSubscriber() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
namespace NKikimr::NOlap::NActualizer {

TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: MemoryUsageLimit(memoryUsageLimit)
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: VersionedIndex(versionedIndex)
, MemoryUsageLimit(memoryUsageLimit)
, SaverContext(saverContext)
, Counters(counters)
, Controller(controller)
Expand All @@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion(
};
auto it = Tasks.find(features.GetRWAddress());
if (it == Tasks.end()) {
std::vector<TTaskConstructor> tasks = {buildNewTask()};
std::vector<TTaskConstructor> tasks = { buildNewTask() };
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
}
if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) {
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
Expand All @@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion(
}
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
}
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
it->second.back().TakePortionInTx(info, VersionedIndex);
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
AFL_VERIFY(dWait);
Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@ class TTaskConstructor {
YDB_READONLY_DEF(std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>, MemoryPredictor);
YDB_READONLY_DEF(std::shared_ptr<TTTLColumnEngineChanges>, Task);
YDB_ACCESSOR(ui64, MemoryUsage, 0);
YDB_ACCESSOR(ui64, TxWriteVolume, 0);
YDB_READONLY(ui64, PortionsCount, 0);
YDB_READONLY(ui64, ChunksCount, 0);

public:
TTaskConstructor(const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& predictor, const std::shared_ptr<TTTLColumnEngineChanges>& task)
: MemoryPredictor(predictor)
, Task(task) {

}

bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
if (!PortionsCount) {
return true;
}
return
(PortionsCount + 1 < 1000) &&
(ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000);
}

void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
++PortionsCount;
ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount());
}
};

class TTieringProcessContext {
private:
const TVersionedIndex& VersionedIndex;
THashSet<TPortionAddress> UsedPortions;
const ui64 MemoryUsageLimit;
TSaverContext SaverContext;
Expand Down Expand Up @@ -63,7 +80,8 @@ class TTieringProcessContext {
}
}

TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller);
};

Expand Down
Loading

0 comments on commit 1c441a9

Please sign in to comment.