Skip to content

Commit

Permalink
Speed up SIMPLE scanner (#12164)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 2, 2024
1 parent 1cf8c98 commit 6fc18f8
Show file tree
Hide file tree
Showing 36 changed files with 633 additions and 904 deletions.
5 changes: 2 additions & 3 deletions ydb/core/formats/arrow/accessor/plain/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ class TChunkAccessor {
return (ui64)ChunkedArray->num_chunks();
}
ui64 GetChunkLength(const ui32 idx) const {
return (ui64)ChunkedArray->chunk(idx)->length();
return (ui64)ChunkedArray->chunks()[idx]->length();
}
void OnArray(const ui32 idx, const ui32 startPosition) const {
const auto& arr = ChunkedArray->chunk(idx);
*Result = IChunkedArray::TLocalDataAddress(arr, startPosition, idx);
*Result = IChunkedArray::TLocalDataAddress(ChunkedArray->chunk(idx), startPosition, idx);
}
};

Expand Down
13 changes: 5 additions & 8 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1397,11 +1397,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
for (auto&& p : i.second) {
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
continue;
}
{
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
Expand Down Expand Up @@ -1433,8 +1430,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
}
}
}
{
std::vector<NOlap::TIndexChunkLoadContext> indexes;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
if (p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -1445,8 +1442,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
return false;
}
}
constructor.SetIndexes(std::move(indexes));
}
constructor.SetIndexes(std::move(indexes));
FetchedAccessors.emplace_back(std::move(constructor));
i.second.pop_back();
}
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ class TPathFetchingState {
AFL_VERIFY(Portions.emplace(portion->GetPortionId(), portion).second);
}

void AddAccessor(const TPortionDataAccessor& accessor) {
void AddAccessor(
const TPortionDataAccessor& accessor, const std::optional<std::set<ui32>>& columnIds, const std::optional<std::set<ui32>>& indexIds) {
AFL_VERIFY(Stage == EFetchStage::Fetching);
AFL_VERIFY(Portions.erase(accessor.GetPortionInfo().GetPortionId()));
AFL_VERIFY(PortionAccessors.emplace(accessor.GetPortionInfo().GetPortionId(), accessor).second);
AFL_VERIFY(PortionAccessors.emplace(accessor.GetPortionInfo().GetPortionId(), accessor.Extract(columnIds, indexIds)).second);
if (Portions.empty()) {
AFL_VERIFY(Stage == EFetchStage::Fetching);
Stage = EFetchStage::Fetched;
Expand Down Expand Up @@ -176,8 +177,8 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
THashMap<ui64, TPathFetchingState> PathIdStatus;
THashSet<ui64> PathIds;
TDataAccessorsResult AccessorsByPathId;
std::optional<std::vector<ui32>> ColumnIds;
std::optional<std::vector<ui32>> IndexIds;
YDB_READONLY_DEF(std::optional<std::set<ui32>>, ColumnIds);
std::optional<std::set<ui32>> IndexIds;

TAtomicCounter PreparingCount = 0;
TAtomicCounter FetchingCount = 0;
Expand All @@ -197,6 +198,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
}

public:
void SetColumnIds(const std::set<ui32>& columnIds) {
AFL_VERIFY(!ColumnIds);
ColumnIds = columnIds;
}

TString DebugString() const {
TStringBuilder sb;
sb << "request_id=" << RequestId << ";";
Expand Down Expand Up @@ -291,7 +297,7 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
{
auto itStatus = PathIdStatus.find(pathId);
AFL_VERIFY(itStatus != PathIdStatus.end());
itStatus->second.AddAccessor(accessor);
itStatus->second.AddAccessor(accessor, ColumnIds, IndexIds);
if (itStatus->second.IsFinished()) {
AFL_VERIFY(FetchingCount.Dec() >= 0);
ReadyCount.Inc();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> TPortionDataAccessor::TP
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto&& i : Columns) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("id", i.GetColumnId());
// NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("column_id", i.GetColumnId());
if (sequentialColumnIds.contains(i.GetColumnId())) {
columns.emplace_back(i.AssembleForSeqAccess());
} else {
Expand Down
55 changes: 55 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,61 @@ class TPortionDataAccessor {
(Indexes ? (Indexes->size() * sizeof(TIndexChunk)) : 0);
}

class TExtractContext {
private:
YDB_ACCESSOR_DEF(std::optional<std::set<ui32>>, ColumnIds);
YDB_ACCESSOR_DEF(std::optional<std::set<ui32>>, IndexIds);

public:
TExtractContext() = default;
};

TPortionDataAccessor Extract(const std::optional<std::set<ui32>>& columnIds, const std::optional<std::set<ui32>>& indexIds) const {
return Extract(TExtractContext().SetColumnIds(columnIds).SetIndexIds(indexIds));
}

TPortionDataAccessor Extract(const TExtractContext& context) const {
AFL_VERIFY(Records);
std::vector<TColumnRecord> extractedRecords;
if (context.GetColumnIds()) {
auto itRec = Records->begin();
auto itExt = context.GetColumnIds()->begin();
while (itRec != Records->end() && itExt != context.GetColumnIds()->end()) {
if (itRec->GetEntityId() == *itExt) {
extractedRecords.emplace_back(*itRec);
++itRec;
} else if (itRec->GetEntityId() < *itExt) {
++itRec;
} else {
++itExt;
}
}
} else {
extractedRecords = *Records;
}

AFL_VERIFY(Indexes);
std::vector<TIndexChunk> extractedIndexes;
if (context.GetIndexIds()) {
auto itIdx = Indexes->begin();
auto itExt = context.GetIndexIds()->begin();
while (itIdx != Indexes->end() && itExt != context.GetIndexIds()->end()) {
if (itIdx->GetEntityId() == *itExt) {
extractedIndexes.emplace_back(*itIdx);
++itIdx;
} else if (itIdx->GetEntityId() < *itExt) {
++itIdx;
} else {
++itExt;
}
}
} else {
extractedIndexes = *Indexes;
}

return TPortionDataAccessor(PortionInfo, std::move(extractedRecords), std::move(extractedIndexes), false);
}

const std::vector<TColumnRecord>& TestGetRecords() const {
AFL_VERIFY(Records);
return std::move(*Records);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
}

void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
"TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
// TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
// "TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
auto g = Stats->MakeGuard("bootstrap");
ScanActorId = ctx.SelfID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <util/string/join.h>
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>

namespace NKikimr::NOlap::NReader::NPlain {
namespace NKikimr::NOlap::NReader::NCommon {

TString TColumnsSet::DebugString() const {
return TStringBuilder() << "("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <util/string/join.h>

namespace NKikimr::NOlap::NReader::NSimple {
namespace NKikimr::NOlap::NReader::NCommon {

enum class EMemType {
Blob,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "fetched_data.h"

#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/library/formats/arrow/common/validation.h>
#include <ydb/library/formats/arrow/simple_arrays_cache.h>

namespace NKikimr::NOlap::NReader::NCommon {

void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema) {
for (auto&& i : fields) {
if (Table->GetSchema()->GetFieldByName(i->name())) {
continue;
}
Table
->AddField(i, std::make_shared<NArrow::NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get(
i->type(), schema.GetExternalDefaultValueVerified(i->name()), Table->num_rows())))
.Validate();
}
}

} // namespace NKikimr::NOlap
Loading

0 comments on commit 6fc18f8

Please sign in to comment.