Skip to content

Commit

Permalink
Merge 3be78ef into c6e9a17
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 14, 2024
2 parents c6e9a17 + 3be78ef commit 47cd8c5
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 46 deletions.
20 changes: 13 additions & 7 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
};

bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxUpdateSchema::Execute");
ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString());

while (!Self->NormalizerController.IsNormalizationFinished()) {
Expand All @@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxUpdateSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
if (NormalizerTasks.empty()) {
Expand Down Expand Up @@ -190,26 +192,26 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {

bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
return false;
}

if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 1) {
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.OnNormalizerFinished(db);
}
return true;
}

void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"event", "TTxApplyNormalizer::Complete");
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")(
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
Changes->ApplyOnComplete(Self->NormalizerController);
Self->NormalizerController.GetNormalizer()->OnResultReady();
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
return;
}
Expand Down Expand Up @@ -240,6 +242,8 @@ class TTxInitSchema: public TTransactionBase<TColumnShard> {
};

bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxInitSchema::Execute");
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());

const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
Expand Down Expand Up @@ -286,6 +290,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete");
Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
Expand Down
18 changes: 13 additions & 5 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,9 +1279,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
bool reask = false;
for (auto&& i : PortionsByPath) {
for (auto&& p : i.second) {
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
{
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
}
}
Expand All @@ -1295,12 +1303,12 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
std::vector<NOlap::TColumnChunkLoadContextV1> records;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
if (!rowset.Next()) {
return false;
}
Expand Down
35 changes: 34 additions & 1 deletion ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,15 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
};

struct IndexColumnsV2: Table<ColumnsV1TableId> {
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
struct Metadata: Column<3, NScheme::NTypeIds::String> {};

using TKey = TableKey<PathId, PortionId>;
using TColumns = TableColumns<PathId, PortionId, Metadata>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down Expand Up @@ -607,7 +616,8 @@ struct Schema : NIceDb::Schema {
TxDependencies,
TxStates,
TxEvents,
IndexColumnsV1
IndexColumnsV1,
IndexColumnsV2
>;

//
Expand Down Expand Up @@ -997,6 +1007,29 @@ class TColumnChunkLoadContextV1 {
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);

public:
template <class TSource>
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetMetadata());
records.emplace_back(std::move(result));
}
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo proto;
proto.SetSSColumnId(Address.GetColumnId());
proto.SetChunkIdx(Address.GetChunkIdx());
*proto.MutableMetadata() = MetaProto;
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
return proto;
}

TFullChunkAddress GetFullChunkAddress() const {
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
namespace NKikimr::NOlap::NActualizer {

TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: MemoryUsageLimit(memoryUsageLimit)
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: VersionedIndex(versionedIndex)
, MemoryUsageLimit(memoryUsageLimit)
, SaverContext(saverContext)
, Counters(counters)
, Controller(controller)
Expand All @@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion(
};
auto it = Tasks.find(features.GetRWAddress());
if (it == Tasks.end()) {
std::vector<TTaskConstructor> tasks = {buildNewTask()};
std::vector<TTaskConstructor> tasks = { buildNewTask() };
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
}
if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) {
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
Expand All @@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion(
}
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
}
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
it->second.back().TakePortionInTx(info, VersionedIndex);
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
AFL_VERIFY(dWait);
Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@ class TTaskConstructor {
YDB_READONLY_DEF(std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>, MemoryPredictor);
YDB_READONLY_DEF(std::shared_ptr<TTTLColumnEngineChanges>, Task);
YDB_ACCESSOR(ui64, MemoryUsage, 0);
YDB_ACCESSOR(ui64, TxWriteVolume, 0);
YDB_READONLY(ui64, PortionsCount, 0);
YDB_READONLY(ui64, ChunksCount, 0);

public:
TTaskConstructor(const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& predictor, const std::shared_ptr<TTTLColumnEngineChanges>& task)
: MemoryPredictor(predictor)
, Task(task) {

}

bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
if (!PortionsCount) {
return true;
}
return
(PortionsCount + 1 < 1000) &&
(ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000);
}

void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
++PortionsCount;
ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount());
}
};

class TTieringProcessContext {
private:
const TVersionedIndex& VersionedIndex;
THashSet<TPortionAddress> UsedPortions;
const ui64 MemoryUsageLimit;
TSaverContext SaverContext;
Expand Down Expand Up @@ -63,7 +80,8 @@ class TTieringProcessContext {
}
}

TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller);
};

Expand Down
18 changes: 11 additions & 7 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,13 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
std::shared_ptr<TCleanupPortionsColumnEngineChanges> changes = std::make_shared<TCleanupPortionsColumnEngineChanges>(StoragesManager);

// Add all portions from dropped paths
ui64 txSize = 0;
const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
ui64 portionsCount = 0;
ui64 chunksCount = 0;
ui32 skipLocked = 0;
ui32 portionsFromDrop = 0;
bool limitExceeded = false;
const ui32 maxChunksCount = 100000;
const ui32 maxPortionsCount = 1000;
for (ui64 pathId : pathsToDrop) {
auto g = GranulesStorage->GetGranuleOptional(pathId);
if (!g) {
Expand All @@ -335,8 +337,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++skipLocked;
continue;
}
if (txSize + info->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) {
txSize += info->GetTxVolume();
++portionsCount;
chunksCount += info->GetApproxChunksCount(info->GetSchema(VersionedIndex)->GetColumnsCount());
if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) {
} else {
limitExceeded = true;
break;
Expand All @@ -360,8 +363,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
continue;
}
AFL_VERIFY(it->second[i]->CheckForCleanup(snapshot))("p_snapshot", it->second[i]->GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i]->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) {
txSize += it->second[i]->GetTxVolume();
++portionsCount;
chunksCount += it->second[i]->GetApproxChunksCount(it->second[i]->GetSchema(VersionedIndex)->GetColumnsCount());
if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) {
} else {
limitExceeded = true;
break;
Expand Down Expand Up @@ -397,7 +401,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());

TSaverContext saverContext(StoragesManager);
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, SignalCounters, ActualizationController);
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController);
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetActualizationTasksLag();
for (auto&& i : pathEviction) {
auto g = GetGranuleOptional(i.first);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ class TColumnRecord {
return BlobRange;
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo result;
result.SetSSColumnId(GetEntityId());
result.SetChunkIdx(GetChunkIdx());
result.SetMetadata(Meta.SerializeToProto());
*result.MutableBlobRangeLink() = BlobRange.SerializeToProto();
return result;
}
NKikimrColumnShardDataSharingProto::TColumnRecord SerializeToProto() const;
static TConclusion<TColumnRecord> BuildFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
TColumnRecord result;
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ void TPortionDataAccessor::SaveToDatabase(IDbWrapper& db, const ui32 firstPKColu
FullValidation();
db.WritePortion(*PortionInfo);
if (!saveOnlyMeta) {
NKikimrTxColumnShard::TIndexPortionAccessor protoData;
for (auto& record : GetRecordsVerified()) {
*protoData.AddChunks() = record.SerializeToDBProto();
}
db.WriteColumns(*PortionInfo, std::move(protoData));

for (auto& record : GetRecordsVerified()) {
db.WriteColumn(*PortionInfo, record, firstPKColumnId);
}
Expand Down Expand Up @@ -533,7 +539,7 @@ void TPortionDataAccessor::FullValidation() const {
blobIdxs.emplace(bRange->GetBlobIdxVerified());
}
}
AFL_VERIFY(blobIdxs.size());
AFL_VERIFY(blobIdxs.size())("portion_info", PortionInfo->DebugString());
AFL_VERIFY(PortionInfo->GetBlobIdsCount() == blobIdxs.size());
AFL_VERIFY(PortionInfo->GetBlobIdsCount() == *blobIdxs.rbegin() + 1);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ ui64 TPortionInfo::GetMetadataMemorySize() const {
return sizeof(TPortionInfo) - sizeof(TPortionMeta) + Meta.GetMetadataMemorySize();
}

ui64 TPortionInfo::GetTxVolume() const {
return 1024;
ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const {
return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1);
}

void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ class TPortionInfo {
const TString& GetIndexStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const;
const TString& GetEntityStorageId(const ui32 entityId, const TIndexInfo& indexInfo) const;

ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress
ui64 GetTxVolume() const {
return 1024;
}

ui64 GetApproxChunksCount(const ui32 schemaColumnsCount) const;
ui64 GetMetadataMemorySize() const;

void SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const;
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/engines/protos/portion_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ message TIndexColumnMeta {
optional NKikimrSSA.TProgram.TConstant MaxValue = 4;
optional TIndexPortionMeta PortionMeta = 5[deprecated = true]; // First PK column could contain portion info
}

message TColumnChunkInfo {
optional uint32 SSColumnId = 1;
optional uint32 ChunkIdx = 2;
optional TIndexColumnMeta Metadata = 3;
optional TBlobRangeLink16 BlobRangeLink = 4;
}

message TIndexPortionAccessor {
repeated TColumnChunkInfo Chunks = 1;
}
Loading

0 comments on commit 47cd8c5

Please sign in to comment.