Skip to content

Commit

Permalink
Merge b4edaa4 into e6f71f0
Browse files Browse the repository at this point in the history
  • Loading branch information
aavdonkin authored Oct 21, 2024
2 parents e6f71f0 + b4edaa4 commit 3783590
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 4 deletions.
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard {
bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
auto rowset = db.Table<InsertTable>().Select();
if (!rowset.IsReady()) {
LOG_S_CRIT("Load: insert table rowset is not ready");
return false;
}

Expand All @@ -25,6 +26,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
break;
}
if (!rowset.Next()) {
LOG_S_CRIT("Load: insert table next failed");
return false;
}
}
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -882,17 +882,21 @@ struct Schema : NIceDb::Schema {

static bool IndexCounters_Load(NIceDb::TNiceDb& db, const std::function<void(ui32 id, ui64 value)>& callback) {
auto rowset = db.Table<IndexCounters>().Prefix(0).Select();
if (!rowset.IsReady())
if (!rowset.IsReady()) {
LOG_S_CRIT("Load: index counters rowset is not ready");
return false;
}

while (!rowset.EndOfSet()) {
ui32 id = rowset.GetValue<IndexCounters::Counter>();
ui64 value = rowset.GetValue<IndexCounters::ValueUI64>();

callback(id, value);

if (!rowset.Next())
if (!rowset.Next()) {
LOG_S_CRIT("Load: index counters next failed");
return false;
}
}
return true;
}
Expand Down
62 changes: 62 additions & 0 deletions ydb/core/tx/columnshard/counters/common_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,66 @@ class TDataOwnerSignals: public TCommonCountersOwner {

};

class TLoadTimeSignals: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr TablesLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr SchemaPresetLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr TableVersionsLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr SchemaPresetVersionsLoadingTimeCounter;

NMonitoring::TDynamicCounters::TCounterPtr TablesLoadingFailCounter;
NMonitoring::TDynamicCounters::TCounterPtr SchemaPresetLoadingFailCounter;
NMonitoring::TDynamicCounters::TCounterPtr TableVersionsLoadingFailCounter;
NMonitoring::TDynamicCounters::TCounterPtr SchemaPresetVersionsLoadingFailCounter;


public:
TLoadTimeSignals()
: TBase("Startup")
{
TablesLoadingTimeCounter = TBase::GetValue("Startup/TablesLoadingTime");;
SchemaPresetLoadingTimeCounter = TBase::GetValue("Startup/SchemaPresetLoadingTime");;
TableVersionsLoadingTimeCounter = TBase::GetValue("Startup/TableVersionsLoadingTime");;
SchemaPresetVersionsLoadingTimeCounter = TBase::GetValue("Startup/SchemaPreseVersionstLoadingTime");;

TablesLoadingFailCounter = TBase::GetValue("Startup/TablesLoadingFailCount");;
SchemaPresetLoadingFailCounter = TBase::GetValue("Startup/SchemaPresetLoadingFailCount");;
TableVersionsLoadingFailCounter = TBase::GetValue("Startup/TableVersionsLoadingFailCount");;
SchemaPresetVersionsLoadingFailCounter = TBase::GetValue("Startup/SchemaPreseVersionstLoadingFailCount");;
}

void AddTablesLoadingTime(ui64 microSeconds) {
TablesLoadingTimeCounter->Add(microSeconds);
}

void AddSchemaPresetLoadingTime(ui64 microSeconds) {
SchemaPresetLoadingTimeCounter->Add(microSeconds);
}

void AddTableVersionsLoadingTime(ui64 microSeconds) {
TableVersionsLoadingTimeCounter->Add(microSeconds);
}

void AddSchemaPresetVersionsLoadingTime(ui64 microSeconds) {
SchemaPresetVersionsLoadingTimeCounter->Add(microSeconds);
}

void AddLoadingTablesFail() {
TablesLoadingFailCounter->Add(1);
}

void AddLoadingSchemaPresetFail() {
SchemaPresetLoadingFailCounter->Add(1);
}

void AddLoadingTableVersionsFail() {
TableVersionsLoadingFailCounter->Add(1);
}

void AddLoadingSchemaPresetVersionsFail() {
SchemaPresetVersionsLoadingFailCounter->Add(1);
}
};

}
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/counters/engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ TEngineLogsCounters::TEngineLogsCounters()

GranuleOptimizerLocked = TBase::GetDeriviative("Optimizer/Granules/Locked");

PortionsLoadingTimeCounter = TBase::GetValue("Startup/PortionsLoadingTime");
ColumnsLoadingTimeCounter = TBase::GetValue("Startup/ColumnsLoadingTime");
IndexesLoadingTimeCounter = TBase::GetValue("Startup/IndexesLoadingTime");
LoadPortionsFailCounter = TBase::GetValue("Startup/LoadPortionFailCount");
LoadColumnsFailCounter = TBase::GetValue("Startup/LoadColumnFailCount");
LoadIndexFailCounter = TBase::GetValue("Startup/LoadIndexFailCount");

IndexMetadataUsageBytes = TBase::GetValue("IndexMetadata/Usage/Bytes");

StatUsageForTTLCount = TBase::GetDeriviative("Ttl/StatUsageForTTLCount/Count");
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/tx/columnshard/counters/engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ class TEngineLogsCounters: public TCommonCountersOwner {

NMonitoring::TDynamicCounters::TCounterPtr IndexMetadataUsageBytes;

NMonitoring::TDynamicCounters::TCounterPtr PortionsLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr ColumnsLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr IndexesLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr LoadPortionsFailCounter;
NMonitoring::TDynamicCounters::TCounterPtr LoadColumnsFailCounter;
NMonitoring::TDynamicCounters::TCounterPtr LoadIndexFailCounter;

TAgentGranuleDataCounters GranuleDataAgent;
std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution;
std::vector<std::shared_ptr<TIncrementalHistogram>> PortionSizeDistribution;
Expand Down Expand Up @@ -329,6 +336,31 @@ class TEngineLogsCounters: public TCommonCountersOwner {
GranuleOptimizerLocked->Add(1);
}

void AddPortionLoadingTime(ui64 microSeconds) const {
PortionsLoadingTimeCounter->Add(microSeconds);
}

void AddColumnLoadingTime(ui64 microSeconds) const {
ColumnsLoadingTimeCounter->Add(microSeconds);
}

void AddIndexesLoadingTime(ui64 microSeconds) const {
IndexesLoadingTimeCounter->Add(microSeconds);
}

void AddLoadPortionsFail() const {
LoadPortionsFailCounter->Add(1);
}

void AddLoadColumnsFail() const {
LoadColumnsFailCounter->Add(1);
}

void AddLoadIndexFail() const {
LoadIndexFailCounter->Add(1);
}


TEngineLogsCounters();
};

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/columnshard/counters/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,28 @@ class TInsertTableCounters: public TCommonCountersOwner {
}
};

class TInsertTableLoadCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

NMonitoring::TDynamicCounters::TCounterPtr InsertTableLoadingTimeCounter;
NMonitoring::TDynamicCounters::TCounterPtr InsertTableLoadingFailCounter;

public:
TInsertTableLoadCounters()
: TBase("InsertTableLoad")
{
InsertTableLoadingTimeCounter = TBase::GetValue("Startup/InsertTableLoadingTime");;
InsertTableLoadingFailCounter = TBase::GetValue("Startup/InsertTableLoadFails");;
}

void AddInsertTableLoadingTime(ui64 microSeconds) {
InsertTableLoadingTimeCounter->Add(microSeconds);
}

void AddInsertTableLoadFail() {
InsertTableLoadingFailCounter->Add(1);
}
};

}
16 changes: 16 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,21 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {

bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TPortionConstructors constructors;
TInstant start = TInstant::Now();
{
TMemoryProfileGuard g("TTxInit/LoadColumns/Portions");
if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo();
AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
})) {
SignalCounters.AddLoadPortionsFail();
return false;
}
}

TInstant portionsLoaded = TInstant::Now();

{
TMemoryProfileGuard g("TTxInit/LoadColumns/Records");
TPortionInfo::TSchemaCursor schema(VersionedIndex);
Expand All @@ -226,18 +230,30 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
auto* constructor = constructors.MergeConstructor(std::move(portion));
constructor->LoadRecord(currentSchema->GetIndexInfo(), loadContext);
})) {
SignalCounters.AddLoadColumnsFail();
return false;
}
}

TInstant columnsLoaded = TInstant::Now();

{
TMemoryProfileGuard g("TTxInit/LoadColumns/Indexes");
if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) {
auto* constructor = constructors.GetConstructorVerified(pathId, portionId);
constructor->LoadIndex(loadContext);
})) {
SignalCounters.AddLoadIndexFail();
return false;
};
}

TInstant indexesLoaded = TInstant::Now();
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "load_columns")("portions_loading_time", portionsLoaded - start)("columns_loading_time", columnsLoaded - portionsLoaded)("indexes_loading_time", indexesLoaded - columnsLoaded);
SignalCounters.AddPortionLoadingTime((portionsLoaded - start).MicroSeconds());
SignalCounters.AddColumnLoadingTime((columnsLoaded - portionsLoaded).MicroSeconds());
SignalCounters.AddIndexesLoadingTime((indexesLoaded - columnsLoaded).MicroSeconds());

{
TMemoryProfileGuard g("TTxInit/LoadColumns/Constructors");
for (auto&& [granuleId, pathConstructors] : constructors) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstru
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
if (!rowset.IsReady()) {
LOG_S_CRIT("Load: index columns is not ready");
return false;
}

Expand All @@ -118,6 +119,7 @@ bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstru
callback(std::move(constructor), chunkLoadContext);

if (!rowset.Next()) {
LOG_S_CRIT("Load: index columns next failed");
return false;
}
}
Expand All @@ -129,6 +131,7 @@ bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstr
using IndexPortions = NColumnShard::Schema::IndexPortions;
auto rowset = db.Table<IndexPortions>().Select();
if (!rowset.IsReady()) {
LOG_S_CRIT("Load: index portions is not ready");
return false;
}

Expand Down Expand Up @@ -161,6 +164,7 @@ bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstr
callback(std::move(portion), metaProto);

if (!rowset.Next()) {
LOG_S_CRIT("Load: index portions next failed");
return false;
}
}
Expand Down Expand Up @@ -198,6 +202,7 @@ bool TDbWrapper::LoadIndexes(const std::function<void(const ui64 pathId, const u
using IndexIndexes = NColumnShard::Schema::IndexIndexes;
auto rowset = db.Table<IndexIndexes>().Select();
if (!rowset.IsReady()) {
LOG_S_CRIT("Load: index indexes is not ready");
return false;
}

Expand All @@ -206,6 +211,7 @@ bool TDbWrapper::LoadIndexes(const std::function<void(const ui64 pathId, const u
callback(rowset.GetValue<IndexIndexes::PathId>(), rowset.GetValue<IndexIndexes::PortionId>(), chunkLoadContext);

if (!rowset.Next()) {
LOG_S_CRIT("Load: index indexes next failed");
return false;
}
}
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,18 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant
Loaded = true;
LastWriteId = (TInsertWriteId)0;
if (!NColumnShard::Schema::GetSpecialValueOpt(db, NColumnShard::Schema::EValueIds::LastWriteId, LastWriteId)) {
LoadCounters.AddInsertTableLoadFail();
return false;
}

return dbTable.Load(*this, loadTime);
TInstant start = TInstant::Now();
bool result = dbTable.Load(*this, loadTime);
TInstant finish = TInstant::Now();
if (!result) {
LoadCounters.AddInsertTableLoadFail();
}
LoadCounters.AddInsertTableLoadingTime((finish - start).MicroSeconds());
return result;
}

std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TInsertTableAccessor {
protected:
TInsertionSummary Summary;
THashMap<TUnifiedBlobId, ui32> BlobLinks;
NColumnShard::TInsertTableLoadCounters LoadCounters;

void AddBlobLink(const TUnifiedBlobId& blobId) {
++BlobLinks[blobId];
Expand Down
Loading

0 comments on commit 3783590

Please sign in to comment.