Skip to content

Commit

Permalink
compaction speedup (#10323)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 23, 2024
1 parent 0ea658a commit acb4df3
Show file tree
Hide file tree
Showing 136 changed files with 3,140 additions and 376 deletions.
9 changes: 9 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_SumL_GroupL_OrderL
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/query KqpLimits.ComputeActorMemoryAllocationFailureQueryService
ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ struct TKikimrEvents : TEvents {
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
ES_INCREMENTAL_RESTORE_SCAN = 4261,
ES_FEATURE_FLAGS = 4262,
ES_PRIORITY_QUEUE = 4263,
};
};

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/config.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/priorities/usage/config.h>
#include <ydb/core/tx/priorities/usage/service.h>
#include <ydb/core/tx/limiter/service/service.h>
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>
Expand Down Expand Up @@ -2199,6 +2201,28 @@ void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup*
}
}

TCompPrioritiesInitializer::TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TCompPrioritiesInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NPrioritiesQueue::TConfig serviceConfig;
if (Config.HasCompPrioritiesConfig()) {
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetCompPrioritiesConfig()));
}

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_COMP_PRIORITIES");

auto service = NPrioritiesQueue::TCompServiceOperator::CreateService(serviceConfig, conveyorGroup);

setup->LocalServices.push_back(std::make_pair(
NPrioritiesQueue::TCompServiceOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompPrioritiesInitializer: public IKikimrServicesInitializer {
public:
TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}

if (serviceMask.EnableCompPriorities) {
sil->AddServiceInitializer(new TCompPrioritiesInitializer(runConfig));
}

if (serviceMask.EnableCompConveyor) {
sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ union TBasicKikimrServicesMask {
bool EnableCompDiskLimiter:1;
bool EnableGroupedMemoryLimiter:1;
bool EnableAwsService:1;
bool EnableCompPriorities : 1;
};

struct {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class TTrivialArray: public IChunkedArray {
}

public:
const std::shared_ptr<arrow::Array>& GetArray() const {
return Array;
}

TTrivialArray(const std::shared_ptr<arrow::Array>& data)
: TBase(data->length(), EType::Array, data->type())
, Array(data) {
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/formats/arrow/accessor/plain/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar

std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
auto chunked = columnData->GetChunkedArray();
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
return NArrow::ToBatch(table, true);
if (columnData->GetType() == IChunkedArray::EType::Array) {
const auto* arr = static_cast<const TTrivialArray*>(columnData.get());
return arrow::RecordBatch::Make(schema, columnData->GetRecordsCount(), { arr->GetArray() });
} else {
auto chunked = columnData->GetChunkedArray();
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
return NArrow::ToBatch(table, chunked->num_chunks() > 1);
}
}

} // namespace NKikimr::NArrow::NAccessor::NPlain
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& col

auto field = arrow::field(name, column.type());
if (!field || !field->type()->Equals(column.type())) {
return arrow::Status::Invalid("Cannot create field.");
return arrow::Status::Invalid("Cannot create field " + name + ". type:" + field->type()->ToString() + " vs " + column.type()->ToString());
}
if (!column.is_scalar() && column.length() != Rows) {
return arrow::Status::Invalid("Wrong column length.");
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/formats/arrow/reader/position.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ TSortableScanData::TSortableScanData(
BuildPosition(position);
}

TSortableScanData::TSortableScanData(
const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
for (auto&& c : batch->columns()) {
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(c));
}
Fields = batch->schema()->fields();
BuildPosition(position);
}

TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<arrow::Table>& batch, const std::vector<std::string>& columns) {
for (auto&& i : columns) {
auto c = batch->GetColumnByName(i);
Expand Down
52 changes: 45 additions & 7 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TSortableScanData {
return StartPosition <= position && position < FinishPosition;
}
public:
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch);
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::Table>& batch, const std::vector<std::string>& columns);
TSortableScanData(const ui64 position, const std::shared_ptr<TGeneralContainer>& batch, const std::vector<std::string>& columns);
Expand Down Expand Up @@ -357,6 +358,19 @@ class TSortableBatchPosition {
Y_ABORT_UNLESS(Sorting->GetColumns().size());
}

template <class TRecords>
TSortableBatchPosition(const std::shared_ptr<TRecords>& batch, const ui32 position, const bool reverseSort)
: Position(position)
, ReverseSort(reverseSort) {
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_rows());
RecordsCount = batch->num_rows();
AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount);
Sorting = std::make_shared<TSortableScanData>(Position, batch);
Y_DEBUG_ABORT_UNLESS(batch->ValidateFull().ok());
Y_ABORT_UNLESS(Sorting->GetColumns().size());
}

std::partial_ordering GetReverseForCompareResult(const std::partial_ordering directResult) const {
if (directResult == std::partial_ordering::less) {
return std::partial_ordering::greater;
Expand Down Expand Up @@ -496,19 +510,17 @@ class TIntervalPositions {

void AddPosition(TIntervalPosition&& intervalPosition) {
if (Positions.size()) {
AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
AFL_VERIFY_DEBUG(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
}
Positions.emplace_back(std::move(intervalPosition));
}

void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval);
AddPosition(std::move(intervalPosition));
AddPosition(TIntervalPosition(std::move(position), includePositionToLeftInterval));
}

void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(position, includePositionToLeftInterval);
AddPosition(std::move(intervalPosition));
AddPosition(TIntervalPosition(position, includePositionToLeftInterval));
}
};

Expand Down Expand Up @@ -580,7 +592,11 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
result.emplace_back(nullptr);
return result;
}
if (!it.IsValid()) {
return { batch };
}
TRWSortableBatchPosition pos(batch, 0, columnNames, {}, false);
it.SkipToUpper(pos);
bool batchFinished = false;
i64 recordsCountSplitted = 0;
for (; it.IsValid() && !batchFinished; it.Next()) {
Expand Down Expand Up @@ -636,6 +652,10 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
const auto& CurrentPosition() const {
return Current->first;
}

void SkipToUpper(const TSortableBatchPosition& /*toPos*/) {
return;
}
};

template <class TContainer>
Expand Down Expand Up @@ -666,6 +686,10 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
const auto& CurrentPosition() const {
return *Current;
}

void SkipToUpper(const TSortableBatchPosition& /*toPos*/) {
return;
}
};

template <class TContainer>
Expand All @@ -676,8 +700,8 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly

class TIntervalPointsIterator {
private:
typename TIntervalPositions::const_iterator Current;
typename TIntervalPositions::const_iterator End;
TIntervalPositions::const_iterator Current;
TIntervalPositions::const_iterator End;

public:
TIntervalPointsIterator(const TIntervalPositions& container)
Expand All @@ -696,6 +720,20 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
const auto& CurrentPosition() const {
return Current->GetPosition();
}

struct TComparator {
bool operator()(const TIntervalPosition& pos, const TSortableBatchPosition& value) const {
return pos.GetPosition() < value;
}
bool operator()(const TSortableBatchPosition& value, const TIntervalPosition& pos) const {
return value < pos.GetPosition();
}

};

void SkipToUpper(const TSortableBatchPosition& toPos) {
Current = std::upper_bound(Current, End, toPos, TComparator());
}
};

static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInIntervalPositions(
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/formats/arrow/save_load/loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCou
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type());
}

TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(const TString& dataStr, const ui32 recordsCount) const {
auto result = Apply(dataStr);
if (result.ok()) {
return BuildAccessor(*result, BuildAccessorContext(recordsCount));
} else {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_blob")("data_size", dataStr.size())(
"expected_records_count", recordsCount)("problem", result.status().ToString());
return TConclusionStatus::Fail(result.status().ToString());
}
}

std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
auto data = TStatusValidator::GetValid(Apply(dataStr));
return BuildAccessor(data, BuildAccessorContext(recordsCount));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/save_load/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TColumnLoader {

TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(const TString& data, const ui32 expectedRecordsCount) const;
std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const;
};

Expand Down
8 changes: 7 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ message TConveyorConfig {
optional double WorkersCountDouble = 5;
}

message TPrioritiesQueueConfig {
optional bool Enabled = 1 [default = true];
optional uint32 Limit = 2 [default = 32];
}

message TLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 Limit = 2;
Expand Down Expand Up @@ -2039,9 +2044,10 @@ message TAppConfig {
optional TLimiterConfig CompDiskLimiterConfig = 79;
optional TMetadataCacheConfig MetadataCacheConfig = 80;
optional TMemoryControllerConfig MemoryControllerConfig = 81;
optional TGroupedMemoryLimiterConfig GroupedMemoryLimiterConfig = 82;
optional TGroupedMemoryLimiterConfig GroupedMemoryLimiterConfig = 82;
optional NKikimrReplication.TReplicationDefaults ReplicationConfig = 83;
optional TShutdownConfig ShutdownConfig = 84;
optional TPrioritiesQueueConfig CompPrioritiesConfig = 85;

repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/console_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ message TConfigItem {
BackgroundCleaningConfigItem = 77;
MetadataCacheConfigItem = 80;
MemoryControllerConfigItem = 81;
GroupedMemoryLimiterConfig = 82;
ReplicationConfigItem = 83;
CompPrioritiesConfig = 85;

NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,14 @@ message TCompactionPlannerConstructorContainer {
optional uint32 FreshnessCheckDurationSeconds = 2 [default = 300];
}

message TLCOptimizer {

}

oneof Implementation {
TLOptimizer LBuckets = 20;
TSOptimizer SBuckets = 21;
TLCOptimizer LCBuckets = 22;
}
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ struct Schema : NIceDb::Schema {
struct TierName: Column<11, NScheme::NTypeIds::Utf8> {};
struct Stats: Column<12, NScheme::NTypeIds::Utf8> {};
struct Optimized: Column<13, NScheme::NTypeIds::Uint8> {};
struct CompactionLevel: Column<14, NScheme::NTypeIds::Uint64> {};
struct Details: Column<15, NScheme::NTypeIds::Utf8> {};

using TKey = TableKey<PathId, TabletId, PortionId>;
using TColumns = TableColumns<
Expand All @@ -549,7 +551,9 @@ struct Schema : NIceDb::Schema {
Activity,
TierName,
Stats,
Optimized
Optimized,
CompactionLevel,
Details
>;
};

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
#include <ydb/services/ext_index/service/executor.h>
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/priorities/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
#include <ydb/library/folder_service/mock/mock_folder_service_adapter.h>

Expand Down Expand Up @@ -832,6 +833,11 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
auto* actor = NPrioritiesQueue::TCompServiceOperator::CreateService(NPrioritiesQueue::TConfig(), new ::NMonitoring::TDynamicCounters());
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NPrioritiesQueue::TCompServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
auto* actor = NConveyor::TScanServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ PEERDIR(
ydb/services/ext_index/service
ydb/services/ymq
ydb/core/tx/conveyor/service
ydb/core/tx/priorities/service
ydb/core/tx/limiter/grouped_memory/usage
ydb/services/fq
ydb/services/kesus
Expand Down
Loading

0 comments on commit acb4df3

Please sign in to comment.