Skip to content

Commit

Permalink
Cs to stable (#10390)
Browse files Browse the repository at this point in the history
Co-authored-by: Artem Alekseev <fexolm@ydb.tech>
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
Co-authored-by: Alexander Avdonkin <aavdonkin@yandex.ru>
Co-authored-by: Semyon <yentsovsemyon@ydb.tech>
  • Loading branch information
5 people authored Oct 14, 2024
1 parent 9d9b5d6 commit cda9edf
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 18 deletions.
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {

void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType,
const TPortionInfo* exPortionInfo) {
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);

if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) {
auto before = Counters.Active();
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
auto after = Counters.Active();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_stats_updated")("type", updateType)("path_id", portionInfo.GetPathId())("portion", portionInfo.GetPortionId())("before_size", before.Bytes)("after_size", after.Bytes)("before_rows", before.Rows)("after_rows", after.Rows);
} else {
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
}
const ui64 pathId = portionInfo.GetPathId();
Y_ABORT_UNLESS(pathId);
if (!PathStats.contains(pathId)) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ SRCS(
defs.cpp
)

GENERATE_ENUM_SERIALIZATION(column_engine_logs.h)

PEERDIR(
contrib/libs/apache/arrow
ydb/core/base
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/service/allocation.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {
Expand All @@ -9,7 +10,7 @@ enum class EAllocationStatus {
Failed
};

class TAllocationInfo {
class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounter<TAllocationInfo> {
private:
std::shared_ptr<IAllocation> Allocation;
YDB_READONLY(ui64, AllocationInternalGroupId, 0);
Expand All @@ -25,7 +26,7 @@ class TAllocationInfo {
if (GetAllocationStatus() != EAllocationStatus::Failed) {
Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
}

AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName());
}

Expand Down Expand Up @@ -69,8 +70,8 @@ class TAllocationInfo {
}
}

TAllocationInfo(const ui64 processId, const ui64 scopeId, const ui64 allocationInternalGroupId, const std::shared_ptr<IAllocation>& allocation,
const std::shared_ptr<TStageFeatures>& stage);
TAllocationInfo(const ui64 processId, const ui64 scopeId, const ui64 allocationInternalGroupId,
const std::shared_ptr<IAllocation>& allocation, const std::shared_ptr<TStageFeatures>& stage);
};

} // namespace NKikimr::NOlap::NGroupedMemoryManager
4 changes: 3 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/service/group.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once
#include "allocation.h"

#include <ydb/core/tx/columnshard/counters/common/object_counter.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {

class TProcessMemoryScope;

class TGrouppedAllocations {
class TGrouppedAllocations: public NColumnShard::TMonitoringObjectsCounter<TGrouppedAllocations> {
private:
THashMap<ui64, std::shared_ptr<TAllocationInfo>> Allocations;

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/service/ids.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ ui64 TIdsControl::ExtractInternalIdVerified(const ui64 externalId) {
return result;
}

std::optional<ui64> TIdsControl::ExtractInternalIdOptional(const ui64 externalId) {
auto it = ExternalIdIntoInternalId.find(externalId);
if (it == ExternalIdIntoInternalId.end()) {
return std::nullopt;
}
const ui64 result = it->second;
InternalIdIntoExternalId.erase(result);
ExternalIdIntoInternalId.erase(it);
return result;
}

std::optional<ui64> TIdsControl::GetInternalIdOptional(const ui64 externalId) const {
auto it = ExternalIdIntoInternalId.find(externalId);
if (it != ExternalIdIntoInternalId.end()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/limiter/grouped_memory/service/ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TIdsControl {
}

[[nodiscard]] ui64 ExtractInternalIdVerified(const ui64 externalId);
[[nodiscard]] std::optional<ui64> ExtractInternalIdOptional(const ui64 externalId);

ui64 GetMinInternalIdVerified() const;
ui64 GetExternalIdVerified(const ui64 internalId) const;
Expand Down
31 changes: 20 additions & 11 deletions ydb/core/tx/limiter/grouped_memory/service/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
#include "group.h"
#include "ids.h"

#include <ydb/core/tx/columnshard/counters/common/object_counter.h>

#include <ydb/library/accessor/validator.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {

class TProcessMemoryScope {
class TProcessMemoryScope: public NColumnShard::TMonitoringObjectsCounter<TProcessMemoryScope> {
private:
const ui64 ExternalProcessId;
const ui64 ExternalScopeId;
Expand Down Expand Up @@ -63,6 +65,8 @@ class TProcessMemoryScope {
}
GroupIds.Clear();
AllocationInfo.clear();
AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "scope_cleaned")("process_id", ExternalProcessId)(
"external_scope_id", ExternalScopeId);
return true;
}

Expand Down Expand Up @@ -106,7 +110,11 @@ class TProcessMemoryScope {
bool UnregisterAllocation(const ui64 allocationId) {
ui64 memoryAllocated = 0;
auto it = AllocationInfo.find(allocationId);
AFL_VERIFY(it != AllocationInfo.end());
if (it == AllocationInfo.end()) {
AFL_WARN(NKikimrServices::GROUPED_MEMORY_LIMITER)("reason", "allocation_cleaned_in_previous_scope_id_live")(
"allocation_id", allocationId)("process_id", ExternalProcessId)("external_scope_id", ExternalScopeId);
return true;
}
bool waitFlag = false;
const ui64 internalGroupId = it->second->GetAllocationInternalGroupId();
switch (it->second->GetAllocationStatus()) {
Expand All @@ -127,12 +135,15 @@ class TProcessMemoryScope {
}

void UnregisterGroup(const bool isPriorityProcess, const ui64 externalGroupId) {
const ui64 internalGroupId = GroupIds.ExtractInternalIdVerified(externalGroupId);
AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "remove_group")("external_group_id", externalGroupId)(
"internal_group_id", internalGroupId);
UnregisterGroupImpl(internalGroupId);
if (isPriorityProcess && (internalGroupId < GroupIds.GetMinInternalIdDef(internalGroupId))) {
Y_UNUSED(TryAllocateWaiting(isPriorityProcess, 0));
if (auto internalGroupId = GroupIds.ExtractInternalIdOptional(externalGroupId)) {
AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "remove_group")("external_group_id", externalGroupId)(
"internal_group_id", internalGroupId);
UnregisterGroupImpl(*internalGroupId);
if (isPriorityProcess && (*internalGroupId < GroupIds.GetMinInternalIdDef(*internalGroupId))) {
Y_UNUSED(TryAllocateWaiting(isPriorityProcess, 0));
}
} else {
AFL_WARN(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "remove_absent_group")("external_group_id", externalGroupId);
}
}

Expand All @@ -141,7 +152,7 @@ class TProcessMemoryScope {
}
};

class TProcessMemory {
class TProcessMemory: public NColumnShard::TMonitoringObjectsCounter<TProcessMemory> {
private:
const ui64 ExternalProcessId;

Expand Down Expand Up @@ -214,7 +225,6 @@ class TProcessMemory {
if (it->second->Unregister()) {
AllocationScopes.erase(it);
}

}

void RegisterScope(const ui64 externalScopeId) {
Expand All @@ -224,7 +234,6 @@ class TProcessMemory {
} else {
it->second->Register();
}

}

void SetPriorityProcess() {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
}

TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId];
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"PersistSingleStats for pathId " << pathId.LocalPathId << " shard idx " << shardIdx << " data size " << dataSize << " row count " << rowCount
);
const auto* shardInfo = Self->ShardInfos.FindPtr(shardIdx);
if (!shardInfo) {
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down

0 comments on commit cda9edf

Please sign in to comment.