Skip to content

Commit

Permalink
change memory planner for CS scan (#7372)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 12, 2024
1 parent f93ea93 commit a2e8760
Show file tree
Hide file tree
Showing 74 changed files with 2,492 additions and 371 deletions.
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ struct TKikimrEvents : TEvents {
ES_SS_BG_TASKS = 4257,
ES_LIMITER = 4258,
ES_MEMORY = 4259,
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
};
};

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/core/tx/limiter/grouped_memory/usage/config.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>

#include <ydb/core/backup/controller/tablet.h>

#include <ydb/services/ext_index/common/config.h>
Expand Down Expand Up @@ -2216,6 +2219,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}

TGroupedMemoryLimiterInitializer::TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NOlap::NGroupedMemoryManager::TConfig serviceConfig;
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetGroupedMemoryLimiterConfig()));

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_GROUPED_MEMORY_LIMITER");

auto service = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(serviceConfig, countersGroup);

setup->LocalServices.push_back(std::make_pair(
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
public:
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
}

if (serviceMask.EnableGroupedMemoryLimiter) {
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ union TBasicKikimrServicesMask {
bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableCompDiskLimiter:1;
bool EnableGroupedMemoryLimiter:1;
};

struct {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ PEERDIR(
ydb/core/security
ydb/core/security/ldap_auth_provider
ydb/core/statistics/aggregator
ydb/core/statistics/service
ydb/core/statistics/service
ydb/core/sys_view/processor
ydb/core/sys_view/service
ydb/core/tablet
Expand All @@ -113,6 +113,7 @@ PEERDIR(
ydb/core/tx/coordinator
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/service
ydb/core/tx/limiter/grouped_memory/usage
ydb/core/tx/datashard
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,18 @@ std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::Recor
return DeserializeBatch(SerializeBatch(original, arrow::ipc::IpcWriteOptions::Defaults()), original->schema());
}

std::shared_ptr<arrow::Table> ReallocateBatch(const std::shared_ptr<arrow::Table>& original) {
if (!original) {
return original;
}
auto batches = NArrow::SliceToRecordBatches(original);
for (auto&& i : batches) {
i = NArrow::TStatusValidator::GetValid(
NArrow::NSerialization::TNativeSerializer().Deserialize(NArrow::NSerialization::TNativeSerializer().SerializeFull(i)));
}
return NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches(batches));
}

std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
std::vector<std::shared_ptr<arrow::Array>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Ar
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::RecordBatch> original);
std::shared_ptr<arrow::Table> ReallocateBatch(const std::shared_ptr<arrow::Table>& original);

bool HasNulls(const std::shared_ptr<arrow::Array>& column);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/datatime64_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Y_UNIT_TEST_SUITE(KqpDatetime64ColumnShard) {
runnerSettings.WithSampleTables = false;

TTestHelper testHelper(runnerSettings);
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER }, "CS").Initialize();

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false),
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/ut/olap/helpers/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ void TestAggregationsBase(const std::vector<TAggregationTestCase>& cases) {

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").Initialize();

{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
Expand Down Expand Up @@ -49,10 +50,11 @@ void TestAggregationsInternal(const std::vector<TAggregationTestCase>& cases) {
Tests::TServer::TPtr server = new Tests::TServer(settings);

auto runtime = server->GetRuntime();
Tests::NCommon::TLoggerInit(runtime).Initialize();
Tests::NCommon::TLoggerInit(runtime).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER }, "CS").Initialize();
auto sender = runtime->AllocateEdgeActor();

InitRoot(server, sender);
Tests::NCommon::TLoggerInit(runtime).Initialize();

ui32 numShards = 1;
ui32 numIterations = 10;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ message TLimiterConfig {
optional uint64 PeriodMilliSeconds = 3 [default = 1000];
}

message TGroupedMemoryLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 MemoryLimit = 2;
}

message TExternalIndexConfig {
optional bool Enabled = 1 [default = true];
optional TInternalRequestConfig RequestConfig = 2;
Expand Down Expand Up @@ -1525,7 +1530,6 @@ message TColumnShardConfig {
repeated TRepairInfo Repairs = 15;

optional uint32 MaxInFlightIntervalsOnRequest = 16;
optional uint32 MaxInFlightMemoryOnRequest = 17;
optional uint32 MaxReadStaleness_ms = 18 [default = 300000];
}

Expand Down Expand Up @@ -1906,6 +1910,7 @@ message TAppConfig {
optional TLimiterConfig CompDiskLimiterConfig = 79;
optional TMetadataCacheConfig MetadataCacheConfig = 80;
optional TMemoryControllerConfig MemoryControllerConfig = 81;
optional TGroupedMemoryLimiterConfig GroupedMemoryLimiterConfig = 82;

repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
#include <ydb/services/ext_index/service/executor.h>
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
#include <ydb/library/folder_service/mock/mock_folder_service_adapter.h>

#include <ydb/core/client/server/ic_nodes_cache_service.h>
Expand Down Expand Up @@ -767,6 +768,11 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
auto* actor = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(NOlap::NGroupedMemoryManager::TConfig(), new ::NMonitoring::TDynamicCounters());
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
auto* actor = NConveyor::TScanServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ PEERDIR(
ydb/services/discovery
ydb/services/ext_index/service
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/grouped_memory/usage
ydb/services/fq
ydb/services/kesus
ydb/services/persqueue_cluster_discovery
Expand Down
40 changes: 0 additions & 40 deletions ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,55 +287,16 @@ class TCounterGuard: TNonCopyable {

};

class TReaderResourcesGuard {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> Allocated;
std::shared_ptr<TAtomicCounter> Requested;
const std::shared_ptr<NOlap::TMemoryAggregation> SignalCounter;
const ui64 Volume;

public:
TReaderResourcesGuard(const ui64 volume, const std::shared_ptr<TAtomicCounter>& requested, const std::shared_ptr<NOlap::TMemoryAggregation>& signalWatcher)
: Requested(requested)
, SignalCounter(signalWatcher)
, Volume(volume)
{
AFL_VERIFY(Requested);
Requested->Add(Volume);
SignalCounter->AddBytes(volume);
}

void InitResources(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& g) {
AFL_VERIFY(!Allocated);
AFL_VERIFY(g->GetMemory() == Volume)("volume", Volume)("allocated", g->GetMemory());
Allocated = g;
}

~TReaderResourcesGuard() {
SignalCounter->RemoveBytes(Volume);
AFL_VERIFY(Requested->Sub(Volume) >= 0);
}
};

class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
std::shared_ptr<TAtomicCounter> RequestedResourcesBytes;
std::shared_ptr<TAtomicCounter> MergeTasksCount;
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
public:
TScanAggregations Aggregations;

ui64 GetRequestedMemoryBytes() const {
return RequestedResourcesBytes->Val();
}

std::shared_ptr<TReaderResourcesGuard> BuildRequestedResourcesGuard(const ui64 volume) const {
return std::make_shared<TReaderResourcesGuard>(volume, RequestedResourcesBytes, Aggregations.GetRequestedResourcesMemory());
}

TCounterGuard GetMergeTasksGuard() const {
return TCounterGuard(MergeTasksCount);
}
Expand Down Expand Up @@ -363,7 +324,6 @@ class TConcreteScanCounters: public TScanCounters {

TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
, RequestedResourcesBytes(std::make_shared<TAtomicCounter>())
, MergeTasksCount(std::make_shared<TAtomicCounter>())
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ TPortionInfo::TPreparedBatchData PrepareForAssembleImpl(const TPortionInfo& port
columns.reserve(arrowResultSchema->num_fields());
const ui32 rowsCount = portion.GetRecordsCount();
for (auto&& i : arrowResultSchema->fields()) {
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i->name()), resultSchema.GetColumnLoaderOptional(i->name()));
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i->name()), resultSchema.GetColumnLoaderVerified(i->name()));
}
{
int skipColumnId = -1;
Expand Down Expand Up @@ -716,6 +716,7 @@ std::shared_ptr<NArrow::TGeneralContainer> TPortionInfo::TPreparedBatchData::Ass
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto&& i : Columns) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("id", i.GetColumnId());
if (sequentialColumnIds.contains(i.GetColumnId())) {
columns.emplace_back(i.AssembleForSeqAccess());
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TScanIteratorBase {
return {};
}
virtual bool Finished() const = 0;
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() = 0;
virtual TConclusion<std::shared_ptr<TPartialReadResult>> GetBatch() = 0;
virtual void PrepareResults() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class IDataReader {
virtual TString DoDebugString(const bool verbose) const = 0;
virtual void DoAbort() = 0;
virtual bool DoIsFinished() const = 0;
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
virtual std::vector<std::shared_ptr<TPartialReadResult>> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
virtual TConclusion<bool> DoReadNextInterval() = 0;
public:
IDataReader(const std::shared_ptr<TReadContext>& context);
Expand Down Expand Up @@ -153,7 +153,7 @@ class IDataReader {
return *result;
}

std::vector<TPartialReadResult> ExtractReadyResults(const int64_t maxRowsInBatch) {
std::vector<std::shared_ptr<TPartialReadResult>> ExtractReadyResults(const int64_t maxRowsInBatch) {
return DoExtractReadyResults(maxRowsInBatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ struct TReadMetadataBase {

protected:
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
ui64 TxId = 0;

public:
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;

ui64 GetTxId() const {
return TxId;
}

const TVersionedIndex& GetIndexVersions() const {
AFL_VERIFY(IndexVersionsPointer);
return *IndexVersionsPointer;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ bool TColumnShardScan::ProduceResults() noexcept {
return false;
}

std::optional<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
std::shared_ptr<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
if (!resultOpt) {
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct TReadDescription {
TProgramContainer Program;
public:
// Table
ui64 TxId = 0;
ui64 PathId = 0;
TString TableName;
bool ReadNothing = false;
Expand Down
15 changes: 8 additions & 7 deletions ydb/core/tx/columnshard/engines/reader/common/result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ namespace NKikimr::NOlap::NReader {

class TCurrentBatch {
private:
std::vector<TPartialReadResult> Results;
std::vector<std::shared_ptr<TPartialReadResult>> Results;
ui64 RecordsCount = 0;
public:
ui64 GetRecordsCount() const {
return RecordsCount;
}

void AddChunk(TPartialReadResult&& res) {
RecordsCount += res.GetRecordsCount();
void AddChunk(std::shared_ptr<TPartialReadResult>&& res) {
RecordsCount += res->GetRecordsCount();
Results.emplace_back(std::move(res));
}

void FillResult(std::vector<TPartialReadResult>& result) const {
void FillResult(std::vector<std::shared_ptr<TPartialReadResult>>& result) const {
if (Results.empty()) {
return;
}
Expand All @@ -26,11 +26,12 @@ class TCurrentBatch {
}
};

std::vector<TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult) {
std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResults(
std::vector<std::shared_ptr<TPartialReadResult>>&& resultsExt, const ui32 maxRecordsInResult) {
std::vector<TCurrentBatch> resultBatches;
TCurrentBatch currentBatch;
for (auto&& i : resultsExt) {
AFL_VERIFY(i.GetRecordsCount());
AFL_VERIFY(i->GetRecordsCount());
currentBatch.AddChunk(std::move(i));
if (currentBatch.GetRecordsCount() >= maxRecordsInResult) {
resultBatches.emplace_back(std::move(currentBatch));
Expand All @@ -41,7 +42,7 @@ std::vector<TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPa
resultBatches.emplace_back(std::move(currentBatch));
}

std::vector<TPartialReadResult> result;
std::vector<std::shared_ptr<TPartialReadResult>> result;
for (auto&& i : resultBatches) {
i.FillResult(result);
}
Expand Down
Loading

0 comments on commit a2e8760

Please sign in to comment.