Skip to content

Commit

Permalink
Merge c978b4b into e9534cc
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 3, 2024
2 parents e9534cc + c978b4b commit 2200581
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 22 deletions.
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "columnshard_impl.h"
#include "common/limits.h"
#include "blobs_action/transaction/tx_write.h"
#include "blobs_action/transaction/tx_draft.h"
#include "counters/columnshard.h"
#include "engines/column_engine_logs.h"
#include "operations/batch_builder/builder.h"
#include "operations/write_data.h"

Expand Down Expand Up @@ -193,6 +195,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL);
}

const ui64 minMemoryRead = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>()
.GetGranuleVerified(writeMeta.GetTableId())
.GetPortionsIndex()
.GetMinMemoryRead();
if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "overlimit")("reason", "read_memory")("current", minMemoryRead)(
"limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::OverlimitReadMemory);
return returnFail(COUNTER_WRITE_FAIL);
}

const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
auto arrowData = std::make_shared<TProtoArrowData>(snapshotSchema);
if (!arrowData->ParseFromProto(record)) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/common/limits.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ class TGlobalLimits {
static constexpr inline ui64 InsertCompactionMemoryLimit = 1ULL << 30;
static constexpr inline ui64 GeneralCompactionMemoryLimit = 3ULL << 30;
static constexpr inline ui64 ScanMemoryLimit = 3ULL << 30;

static constexpr inline ui64 DefaultRejectMemoryIntervalLimit = ScanMemoryLimit;
static constexpr inline ui64 DefaultReduceMemoryIntervalLimit = 0.8 * ScanMemoryLimit;
static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
};
}
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/common/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
LIBRARY()

SRCS(
limits.h
limits.cpp
reverse_accessor.cpp
scalars.cpp
snapshot.cpp
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ enum class EWriteFailReason {
LongTxDuplication /* "long_tx_duplication" */,
NoTable /* "no_table" */,
IncorrectSchema /* "incorrect_schema" */,
Overload /* "overload" */
Overload /* "overload" */,
OverlimitReadMemory /* "overlimit_read_memory" */
};

class TCSInitialization: public TCommonCountersOwner {
Expand Down
41 changes: 37 additions & 4 deletions ydb/core/tx/columnshard/counters/engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,32 @@ class TAgentDataClassCounters: public TCommonCountersOwner {
}
};

class TPortionsIndexCounters {
public:
const std::shared_ptr<TValueAggregationClient> MinReadBytes;
TPortionsIndexCounters(const std::shared_ptr<TValueAggregationClient>& minReadBytes)
: MinReadBytes(minReadBytes) {
}
};

class TGranuleDataCounters {
private:
const TDataClassCounters InsertedData;
const TDataClassCounters CompactedData;
const TDataClassCounters FullData;
const TPortionsIndexCounters PortionsIndexCounters;

public:
TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData)
const TPortionsIndexCounters& GetPortionsIndexCounters() const {
return PortionsIndexCounters;
}

TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData,
const std::shared_ptr<TValueAggregationClient>& minReadBytes)
: InsertedData(insertedData)
, CompactedData(compactedData)
, FullData(fullData)
{
, PortionsIndexCounters(minReadBytes) {
}

void OnPortionsDataRefresh(const TBaseGranuleDataClassSummary& inserted, const TBaseGranuleDataClassSummary& compacted) const {
Expand All @@ -105,20 +120,38 @@ class TGranuleDataCounters {
}
};

class TPortionsIndexAgentsCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

public:
const std::shared_ptr<TValueAggregationAgent> MinReadBytes;

TPortionsIndexAgentsCounters(const TString& baseName)
: TBase(baseName)
, MinReadBytes(TBase::GetValueAutoAggregations("MinRead/Bytes")) {
}
};

class TAgentGranuleDataCounters {
private:
TAgentDataClassCounters InsertedData;
TAgentDataClassCounters CompactedData;
TAgentDataClassCounters FullData;
TPortionsIndexAgentsCounters PortionsIndex;

public:
TAgentGranuleDataCounters(const TString& ownerId)
: InsertedData(ownerId, "ByGranule/Inserted")
, CompactedData(ownerId, "ByGranule/Compacted")
, FullData(ownerId, "ByGranule/Full") {
, FullData(ownerId, "ByGranule/Full")
, PortionsIndex("ByGranule/PortionsIndex")
{
}

TGranuleDataCounters RegisterClient() const {
return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient());
return TGranuleDataCounters(
InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient(), PortionsIndex.MinReadBytes->GetClient());
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "columns_set.h"
#include "fetching.h"
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
Expand Down Expand Up @@ -32,9 +33,9 @@ class TSpecialReadContext {
std::array<std::array<std::array<std::array<std::array<std::array<std::shared_ptr<TFetchingScript>, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;

public:
static const inline ui64 DefaultRejectMemoryIntervalLimit = ((ui64)3) << 30;
static const inline ui64 DefaultReduceMemoryIntervalLimit = DefaultRejectMemoryIntervalLimit;
static const inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
static const inline ui64 DefaultRejectMemoryIntervalLimit = TGlobalLimits::DefaultRejectMemoryIntervalLimit;
static const inline ui64 DefaultReduceMemoryIntervalLimit = TGlobalLimits::DefaultReduceMemoryIntervalLimit;
static const inline ui64 DefaultReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize;

const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(DefaultReduceMemoryIntervalLimit);
const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(DefaultRejectMemoryIntervalLimit);
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, const TGranulesStorage& owner, con
, PortionInfoGuard(owner.GetCounters().BuildPortionBlobsGuard())
, Stats(owner.GetStats())
, StoragesManager(owner.GetStoragesManager())
, PortionsIndex(*this)
{
, PortionsIndex(*this, Counters.GetPortionsIndexCounters()) {
NStorageOptimizer::IOptimizerPlannerConstructor::TBuildContext context(PathId, owner.GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetPrimaryKey());
OptimizerPlanner = versionedIndex.GetLastSchema()->GetIndexInfo().GetCompactionPlannerConstructor()->BuildPlanner(context).DetachResult();
AFL_VERIFY(!!OptimizerPlanner);
Expand Down
25 changes: 22 additions & 3 deletions ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPor
TPortionIntervals portionExcludeIntervals;
while (true) {
std::optional<NArrow::TReplaceKey> nextKey;
for (auto&& p : itFrom->second.GetPortionIds()) {
for (auto&& [p, _] : itFrom->second.GetPortionIds()) {
if (skipPortions.contains(p)) {
continue;
}
Expand Down Expand Up @@ -55,9 +55,12 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
auto itTo = Points.find(p->IndexKeyEnd());
AFL_VERIFY(itTo != Points.end());
{
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
auto it = itFrom;
while (true) {
it->second.RemoveContained(p->GetPortionId());
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
it->second.RemoveContained(p->GetPortionId(), minMemoryRead);
++CountMemoryUsages[it->second.GetMinMemoryRead()];
if (it == itTo) {
break;
}
Expand All @@ -67,19 +70,27 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
if (itFrom != itTo) {
itFrom->second.RemoveStart(p);
if (itFrom->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itFrom->second.GetMinMemoryRead());
Points.erase(itFrom);
}
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
Points.erase(itTo);
}
} else {
itTo->second.RemoveStart(p);
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
Points.erase(itTo);
}
}
if (CountMemoryUsages.size()) {
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
} else {
Counters.MinReadBytes->SetValue(0);
}
}

void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
Expand All @@ -89,13 +100,21 @@ void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
itTo->second.AddFinish(p);

auto it = itFrom;
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
while (true) {
it->second.AddContained(p->GetPortionId());
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
it->second.AddContained(p->GetPortionId(), minMemoryRead);
++CountMemoryUsages[it->second.GetMinMemoryRead()];
if (it == itTo) {
break;
}
AFL_VERIFY(++it != Points.end());
}
if (CountMemoryUsages.size()) {
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
} else {
Counters.MinReadBytes->SetValue(0);
}
}

}
46 changes: 38 additions & 8 deletions ydb/core/tx/columnshard/engines/storage/granule/portions_index.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>

namespace NKikimr::NOlap {
Expand All @@ -11,35 +12,44 @@ class TPortionsPKPoint {
private:
THashMap<ui64, std::shared_ptr<TPortionInfo>> Start;
THashMap<ui64, std::shared_ptr<TPortionInfo>> Finish;
THashSet<ui64> PortionIds;
THashMap<ui64, ui64> PortionIds;
YDB_READONLY(ui64, MinMemoryRead, 0);

public:
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetStart() const {
return Start;
}

void ProvidePortions(const TPortionsPKPoint& source) {
for (auto&& i : source.PortionIds) {
MinMemoryRead = 0;
for (auto&& [i, mem] : source.PortionIds) {
if (source.Finish.contains(i)) {
continue;
}
AFL_VERIFY(PortionIds.emplace(i).second);
AddContained(i, mem);
}
}

const THashSet<ui64>& GetPortionIds() const {
const THashMap<ui64, ui64>& GetPortionIds() const {
return PortionIds;
}

bool IsEmpty() const {
return Start.empty() && Finish.empty();
}

void AddContained(const ui64 portionId) {
AFL_VERIFY(PortionIds.emplace(portionId).second);
void AddContained(const ui32 portionId, const ui64 minMemoryRead) {
MinMemoryRead += minMemoryRead;
AFL_VERIFY(PortionIds.emplace(portionId, minMemoryRead).second);
}

void RemoveContained(const ui64 portionId) {
void RemoveContained(const ui32 portionId, const ui64 minMemoryRead) {
AFL_VERIFY(minMemoryRead <= MinMemoryRead);
MinMemoryRead -= minMemoryRead;
AFL_VERIFY(PortionIds.erase(portionId));
if (PortionIds.empty()) {
AFL_VERIFY(!MinMemoryRead);
}
}

void RemoveStart(const std::shared_ptr<TPortionInfo>& p) {
Expand All @@ -64,7 +74,9 @@ class TPortionsPKPoint {
class TPortionsIndex {
private:
std::map<NArrow::TReplaceKey, TPortionsPKPoint> Points;
std::map<ui64, i32> CountMemoryUsages;
const TGranuleMeta& Owner;
const NColumnShard::TPortionsIndexCounters& Counters;

std::map<NArrow::TReplaceKey, TPortionsPKPoint>::iterator InsertPoint(const NArrow::TReplaceKey& key) {
auto it = Points.find(key);
Expand All @@ -75,17 +87,35 @@ class TPortionsIndex {
--itPred;
it->second.ProvidePortions(itPred->second);
}
++CountMemoryUsages[it->second.GetMinMemoryRead()];
}
return it;
}

void RemoveFromMemoryUsageControl(const ui64 mem) {
auto it = CountMemoryUsages.find(mem);
AFL_VERIFY(it != CountMemoryUsages.end())("mem", mem);
if (!--it->second) {
CountMemoryUsages.erase(it);
}
}

public:
TPortionsIndex(const TGranuleMeta& owner)
TPortionsIndex(const TGranuleMeta& owner, const NColumnShard::TPortionsIndexCounters& counters)
: Owner(owner)
, Counters(counters)
{

}

ui64 GetMinMemoryRead() const {
if (CountMemoryUsages.empty()) {
return 0;
} else {
return CountMemoryUsages.rbegin()->second;
}
}

const std::map<NArrow::TReplaceKey, TPortionsPKPoint>& GetPoints() const {
return Points;
}
Expand Down

0 comments on commit 2200581

Please sign in to comment.