Skip to content

Commit

Permalink
Merge 1a5b36d into 398fb41
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 23, 2024
2 parents 398fb41 + 1a5b36d commit 07cd75e
Show file tree
Hide file tree
Showing 61 changed files with 3,999 additions and 88 deletions.
19 changes: 12 additions & 7 deletions ydb/core/formats/arrow/common/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,32 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
}

std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (i32 i = 0; i < Schema->num_fields(); ++i) {
if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
continue;
}
columns.emplace_back(Columns[i]->GetChunkedArray());
if (context.GetRecordsCount() || context.GetStartIndex()) {
columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
} else {
columns.emplace_back(Columns[i]->GetChunkedArray());
}
fields.emplace_back(Schema->field(i));
}
if (fields.empty()) {
return nullptr;
}
AFL_VERIFY(RecordsCount);
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, context.GetRecordsCount().value_or(*RecordsCount));
}

std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
auto result = BuildTableOptional(columnNames);
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
auto result = BuildTableOptional(context);
AFL_VERIFY(result);
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
return result;
}

Expand Down
25 changes: 23 additions & 2 deletions ydb/core/formats/arrow/common/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,29 @@ class TGeneralContainer {
return Columns[idx];
}

std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
class TTableConstructionContext {
private:
YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);

public:
TTableConstructionContext() = default;
TTableConstructionContext(std::set<std::string>&& columnNames)
: ColumnNames(std::move(columnNames)) {
}

TTableConstructionContext(const std::set<std::string>& columnNames)
: ColumnNames(columnNames) {
}

void SetColumnNames(const std::vector<TString>& names) {
ColumnNames = std::set<std::string>(names.begin(), names.end());
}
};

std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;

std::shared_ptr<TGeneralContainer> BuildEmptySame() const;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,7 @@ message TColumnShardConfig {
optional bool ColumnChunksV0Usage = 25 [default = true];
optional bool ColumnChunksV1Usage = 26 [default = true];
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
optional string ReaderClassName = 28;
}

message TSchemeShardConfig {
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,19 @@ message TComputeShardingPolicy {

}

message TEvKqpScanCursor {
message TColumnShardScanPlain {
}
message TColumnShardScanSimple {
optional uint64 SourceId = 1;
optional uint32 StartRecordIndex = 2;
}
oneof Implementation {
TColumnShardScanPlain ColumnShardPlain = 10;
TColumnShardScanSimple ColumnShardSimple = 11;
}
}

message TEvKqpScan {
optional uint64 TxId = 1;
optional uint64 ScanId = 2;
Expand All @@ -1700,6 +1713,7 @@ message TEvKqpScan {
optional TComputeShardingPolicy ComputeShardingPolicy = 23;
optional uint64 LockTxId = 24;
optional uint32 LockNodeId = 25;
optional TEvKqpScanCursor ScanCursor = 26;
}

message TEvCompactTable {
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class TTxInternalScan;
namespace NPlain {
class TIndexScannerConstructor;
}
namespace NSimple {
class TIndexScannerConstructor;
}
} // namespace NReader

namespace NDataSharing {
Expand Down Expand Up @@ -109,7 +112,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
}
} // namespace NLoading

extern bool gAllowLogBatchingDefaultValue;

Expand Down Expand Up @@ -198,6 +201,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NOlap::NReader::TTxScan;
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;

class TStoragesManager;
friend class TTxController;
Expand Down Expand Up @@ -246,7 +250,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& ctx);

void Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,19 @@ class TConcreteScanCounters: public TScanCounters {
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;

public:
TScanAggregations Aggregations;

TCounterGuard GetFetcherAcessorsGuard() const {
return TCounterGuard(FetchAccessorsCount);
}

TCounterGuard GetResultsForSourceGuard() const {
return TCounterGuard(ResultsForSourceCount);
}

TCounterGuard GetMergeTasksGuard() const {
return TCounterGuard(MergeTasksCount);
}
Expand All @@ -320,7 +326,7 @@ class TConcreteScanCounters: public TScanCounters {

bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
FetchAccessorsCount->Val();
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
}

void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
Expand All @@ -335,6 +341,7 @@ class TConcreteScanCounters: public TScanCounters {
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{

Expand Down
100 changes: 100 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,106 @@ std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers(
return result;
}

std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const {
class TEntityDelimiter {
private:
YDB_READONLY(ui32, IndexStart, 0);
YDB_READONLY(ui32, EntityId, 0);
YDB_READONLY(ui32, ChunkIdx, 0);
YDB_READONLY(ui64, MemoryStartChunk, 0);
YDB_READONLY(ui64, MemoryFinishChunk, 0);

public:
TEntityDelimiter(const ui32 indexStart, const ui32 entityId, const ui32 chunkIdx, const ui64 memStartChunk, const ui64 memFinishChunk)
: IndexStart(indexStart)
, EntityId(entityId)
, ChunkIdx(chunkIdx)
, MemoryStartChunk(memStartChunk)
, MemoryFinishChunk(memFinishChunk) {
}

bool operator<(const TEntityDelimiter& item) const {
return std::tie(IndexStart, EntityId, ChunkIdx) < std::tie(item.IndexStart, item.EntityId, item.ChunkIdx);
}
};

class TGlobalDelimiter {
private:
YDB_READONLY(ui32, IndexStart, 0);
YDB_ACCESSOR(ui64, UsedMemory, 0);
YDB_ACCESSOR(ui64, WholeChunksMemory, 0);

public:
TGlobalDelimiter(const ui32 indexStart)
: IndexStart(indexStart) {
}
};

std::vector<TEntityDelimiter> delimiters;

ui32 lastAppliedId = 0;
ui32 currentRecordIdx = 0;
bool needOne = false;
const TColumnRecord* lastRecord = nullptr;
for (auto&& i : GetRecordsVerified()) {
if (lastAppliedId != i.GetEntityId()) {
if (delimiters.size()) {
AFL_VERIFY(delimiters.back().GetIndexStart() == PortionInfo->GetRecordsCount());
}
needOne = entityIds.contains(i.GetEntityId());
currentRecordIdx = 0;
lastAppliedId = i.GetEntityId();
lastRecord = nullptr;
}
if (!needOne) {
continue;
}
delimiters.emplace_back(
currentRecordIdx, i.GetEntityId(), i.GetChunkIdx(), i.GetMeta().GetRawBytes(), lastRecord ? lastRecord->GetMeta().GetRawBytes() : 0);
currentRecordIdx += i.GetMeta().GetRecordsCount();
if (currentRecordIdx == PortionInfo->GetRecordsCount()) {
delimiters.emplace_back(currentRecordIdx, i.GetEntityId(), i.GetChunkIdx() + 1, 0, i.GetMeta().GetRawBytes());
}
lastRecord = &i;
}
std::sort(delimiters.begin(), delimiters.end());
std::vector<TGlobalDelimiter> sumDelimiters;
for (auto&& i : delimiters) {
if (sumDelimiters.empty()) {
sumDelimiters.emplace_back(i.GetIndexStart());
} else if (sumDelimiters.back().GetIndexStart() != i.GetIndexStart()) {
AFL_VERIFY(sumDelimiters.back().GetIndexStart() < i.GetIndexStart());
TGlobalDelimiter backDelimiter(i.GetIndexStart());
backDelimiter.MutableWholeChunksMemory() = sumDelimiters.back().GetWholeChunksMemory();
backDelimiter.MutableUsedMemory() = sumDelimiters.back().GetUsedMemory();
sumDelimiters.emplace_back(std::move(backDelimiter));
}
sumDelimiters.back().MutableWholeChunksMemory() += i.GetMemoryFinishChunk();
sumDelimiters.back().MutableUsedMemory() += i.GetMemoryStartChunk();
}
std::vector<ui32> recordIdx = { 0 };
std::vector<ui64> packMemorySize;
const TGlobalDelimiter* lastBorder = &sumDelimiters.front();
for (auto&& i : sumDelimiters) {
const i64 sumMemory = (i64)i.GetUsedMemory() - (i64)lastBorder->GetWholeChunksMemory();
AFL_VERIFY(sumMemory > 0);
if (((ui64)sumMemory >= memoryLimit || i.GetIndexStart() == PortionInfo->GetRecordsCount()) && i.GetIndexStart()) {
AFL_VERIFY(lastBorder->GetIndexStart() < i.GetIndexStart());
recordIdx.emplace_back(i.GetIndexStart());
packMemorySize.emplace_back(sumMemory);
lastBorder = &i;
}
}
AFL_VERIFY(recordIdx.front() == 0);
AFL_VERIFY(recordIdx.back() == PortionInfo->GetRecordsCount());
AFL_VERIFY(recordIdx.size() == packMemorySize.size() + 1);
std::vector<TReadPage> pages;
for (ui32 i = 0; i < packMemorySize.size(); ++i) {
pages.emplace_back(recordIdx[i], recordIdx[i + 1] - recordIdx[i], packMemorySize[i]);
}
return pages;
}

std::vector<TPortionDataAccessor::TPage> TPortionDataAccessor::BuildPages() const {
std::vector<TPage> pages;
struct TPart {
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,23 @@ class TPortionDataAccessor {

std::vector<TPage> BuildPages() const;
ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const;

class TReadPage {
private:
YDB_READONLY(ui32, IndexStart, 0);
YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY(ui64, MemoryUsage, 0);

public:
TReadPage(const ui32 indexStart, const ui32 recordsCount, const ui64 memoryUsage)
: IndexStart(indexStart)
, RecordsCount(recordsCount)
, MemoryUsage(memoryUsage) {
AFL_VERIFY(RecordsCount);
}
};

std::vector<TReadPage> BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const;
};

} // namespace NKikimr::NOlap
Loading

0 comments on commit 07cd75e

Please sign in to comment.