Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up scan initialization #9086

Merged
23 changes: 10 additions & 13 deletions ydb/core/tx/columnshard/engines/insert_table/committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,24 @@ class TCommittedBlob {
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, RecordsCount, 0);
YDB_READONLY(bool, IsDelete, false);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);
NArrow::TReplaceKey First;
NArrow::TReplaceKey Last;
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);

public:
ui64 GetSize() const {
return BlobRange.Size;
const NArrow::TReplaceKey& GetFirst() const {
return First;
}

const NArrow::TReplaceKey& GetFirstVerified() const {
Y_ABORT_UNLESS(First);
return *First;
const NArrow::TReplaceKey& GetLast() const {
return Last;
}

const NArrow::TReplaceKey& GetLastVerified() const {
Y_ABORT_UNLESS(Last);
return *Last;
ui64 GetSize() const {
return BlobRange.Size;
}

TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount,
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(snapshot)
Expand All @@ -89,7 +86,7 @@ class TCommittedBlob {
}

TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount,
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(writeId)
Expand Down
27 changes: 21 additions & 6 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
dbTable.Insert(*dataPtr);
return true;
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_insertion");
return false;
}
}
Expand All @@ -39,11 +40,15 @@ TInsertionSummary::TCounters TInsertTable::Commit(
auto* pathInfo = Summary.GetPathInfoOptional(pathId);
// There could be commit after drop: propose, drop, plan
if (pathInfo && pathExists(pathId)) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString());
auto committed = data->Commit(planStep, txId);
dbTable.Commit(committed);

pathInfo->AddCommitted(std::move(committed));
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString());
dbTable.Abort(*data);
Summary.AddAborted(std::move(*data));
}
Expand All @@ -58,6 +63,8 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& wr
for (auto writeId : writeIds) {
// There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
if (std::optional<TInsertedData> data = Summary.ExtractInserted(writeId)) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString())("write_id", writeId);
dbTable.EraseInserted(*data);
dbTable.Abort(*data);
Summary.AddAborted(std::move(*data));
Expand Down Expand Up @@ -108,8 +115,8 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant
return dbTable.Load(*this, loadTime);
}

std::vector<TCommittedBlob> TInsertTable::Read(
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const {
std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const {
const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);
if (!pInfo) {
return {};
Expand All @@ -120,15 +127,23 @@ std::vector<TCommittedBlob> TInsertTable::Read(

for (const auto& data : pInfo->GetCommitted()) {
if (lockId || data.GetSnapshot() <= reqSnapshot) {
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
if (lockId) {
for (const auto& [writeId, data] : pInfo->GetInserted()) {
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), start, finish,
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <ydb/core/tx/columnshard/counters/insert_table.h>

namespace NKikimr::NOlap {

class TPKRangesFilter;
class IDbWrapper;

/// Use one table for inserted and committed blobs:
Expand Down Expand Up @@ -57,6 +57,7 @@ class TInsertTableAccessor {
return Summary.AddInserted(std::move(data), load);
}
bool AddAborted(TInsertedData&& data, const bool load) {
AFL_VERIFY_DEBUG(!Summary.ExtractInserted(data.GetInsertWriteId()));
if (load) {
AddBlobLink(data.GetBlobRange().BlobId);
}
Expand Down Expand Up @@ -114,8 +115,8 @@ class TInsertTable: public TInsertTableAccessor {
void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr<IBlobsDeclareRemovingAction>& blobsAction);
void EraseAbortedOnComplete(const TInsertedData& key);

std::vector<TCommittedBlob> Read(
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;
std::vector<TCommittedBlob> Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const;
bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime);

TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) {
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertWriteId writeId = data.GetInsertWriteId();
Counters.Aborted.Add(data.BlobSize(), load);
AFL_VERIFY_DEBUG(!Inserted.contains(writeId));
auto insertInfo = Aborted.emplace(writeId, std::move(data));
Y_ABORT_UNLESS(insertInfo.second);
AFL_VERIFY(insertInfo.second)("write_id", writeId);
return &insertInfo.first->second;
}

Expand All @@ -191,6 +192,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedDat
const ui32 dataSize = data.BlobSize();
const ui64 pathId = data.GetPathId();
auto insertInfo = Inserted.emplace(writeId, std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
if (insertInfo.second) {
OnNewInserted(GetPathInfo(pathId), dataSize, load);
return &insertInfo.first->second;
Expand Down
39 changes: 29 additions & 10 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,57 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
return result;
}

ui64 TPortionInfo::GetColumnRawBytes(const std::vector<ui32>& columnIds, const bool validation) const {
return GetColumnRawBytes(std::set<ui32>(columnIds.begin(), columnIds.end()), validation);
ui64 TPortionInfo::GetColumnRawBytes(const std::set<ui32>& entityIds, const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TColumnRecord& r) {
sum += r.GetMeta().GetRawBytes();
};
AggregateIndexChunksData(aggr, Records, &entityIds, validation);
return sum;
}

ui64 TPortionInfo::GetColumnRawBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
ui64 TPortionInfo::GetColumnBlobBytes(const std::set<ui32>& entityIds, const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TColumnRecord& r) {
sum += r.GetBlobRange().GetSize();
};
AggregateIndexChunksData(aggr, Records, &entityIds, validation);
return sum;
}

ui64 TPortionInfo::GetColumnRawBytes(const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TColumnRecord& r) {
sum += r.GetMeta().GetRawBytes();
};
AggregateIndexChunksData(aggr, Records, entityIds, validation);
AggregateIndexChunksData(aggr, Records, nullptr, validation);
return sum;
}

ui64 TPortionInfo::GetColumnBlobBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
ui64 TPortionInfo::GetColumnBlobBytes(const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TColumnRecord& r) {
sum += r.GetBlobRange().GetSize();
};
AggregateIndexChunksData(aggr, Records, entityIds, validation);
AggregateIndexChunksData(aggr, Records, nullptr, validation);
return sum;
}

ui64 TPortionInfo::GetColumnBlobBytes(const std::vector<ui32>& columnIds, const bool validation) const {
return GetColumnBlobBytes(std::set<ui32>(columnIds.begin(), columnIds.end()), validation);
ui64 TPortionInfo::GetIndexRawBytes(const std::set<ui32>& entityIds, const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TIndexChunk& r) {
sum += r.GetRawBytes();
};
AggregateIndexChunksData(aggr, Indexes, &entityIds, validation);
return sum;
}

ui64 TPortionInfo::GetIndexRawBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
ui64 TPortionInfo::GetIndexRawBytes(const bool validation) const {
ui64 sum = 0;
const auto aggr = [&](const TIndexChunk& r) {
sum += r.GetRawBytes();
};
AggregateIndexChunksData(aggr, Indexes, entityIds, validation);
AggregateIndexChunksData(aggr, Indexes, nullptr, validation);
return sum;
}

Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TPortionInfo {
}

template <class TAggregator, class TChunkInfo>
static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector<TChunkInfo>& chunks, const std::optional<std::set<ui32>>& columnIds, const bool validation) {
static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector<TChunkInfo>& chunks, const std::set<ui32>* columnIds, const bool validation) {
if (columnIds) {
auto itColumn = columnIds->begin();
auto itRecord = chunks.begin();
Expand Down Expand Up @@ -537,7 +537,8 @@ class TPortionInfo {
return result;
}

ui64 GetIndexRawBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
ui64 GetIndexRawBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
ui64 GetIndexRawBytes(const bool validation = true) const;
ui64 GetIndexBlobBytes() const noexcept {
ui64 sum = 0;
for (const auto& rec : Indexes) {
Expand All @@ -546,11 +547,11 @@ class TPortionInfo {
return sum;
}

ui64 GetColumnRawBytes(const std::vector<ui32>& columnIds, const bool validation = true) const;
ui64 GetColumnRawBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
ui64 GetColumnRawBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
ui64 GetColumnRawBytes(const bool validation = true) const;

ui64 GetColumnBlobBytes(const std::vector<ui32>& columnIds, const bool validation = true) const;
ui64 GetColumnBlobBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
ui64 GetColumnBlobBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
ui64 GetColumnBlobBytes(const bool validation = true) const;

ui64 GetTotalBlobBytes() const noexcept {
return GetIndexBlobBytes() + GetColumnBlobBytes();
Expand Down
20 changes: 1 addition & 19 deletions ydb/core/tx/columnshard/engines/predicate/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,7 @@ NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& d
}

bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info) const {
if (const auto& from = PredicateFrom.GetReplaceKey()) {
const auto& portionEnd = info.IndexKeyEnd();
const int commonSize = std::min(from->Size(), portionEnd.Size());
if (std::is_gt(from->ComparePartNotNull(portionEnd, commonSize))) {
return false;
}
}

if (const auto& to = PredicateTo.GetReplaceKey()) {
const auto& portionStart = info.IndexKeyStart();
const int commonSize = std::min(to->Size(), portionStart.Size());
if (std::is_lt(to->ComparePartNotNull(portionStart, commonSize))) {
return false;
}
}
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", info.IndexKeyStart().DebugString())("end", info.IndexKeyEnd().DebugString())(
// "from", PredicateFrom.DebugString())("to", PredicateTo.DebugString());

return true;
return IsPortionInPartialUsage(info.IndexKeyStart(), info.IndexKeyEnd()) != TPKRangeFilter::EUsageClass::DontUsage;
}

TPKRangeFilter::EUsageClass TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ ISnapshotSchema::TPtr TReadMetadataBase::GetLoadSchemaVerified(const TPortionInf

std::vector<TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const TReadDescription& readDescription,
const std::shared_ptr<arrow::Schema>& pkSchema, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot) const {
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema));
AFL_VERIFY(readDescription.PKRangesFilter);
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema, &*readDescription.PKRangesFilter));
}

} // namespace NKikimr::NOlap::NReader
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ struct TReadMetadata : public TReadMetadataBase {
return GetProgram().HasProcessingColumnIds();
}

ui64 GetPathId() const {
return PathId;
}

std::shared_ptr<TSelectInfo> SelectInfo;
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
std::vector<TCommittedBlob> CommittedBlobs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,16 @@ ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_p
std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
const bool needSnapshots = !source->GetExclusiveIntervalOnly() || ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() ||
!source->IsSourceInMemory();
bool partialUsageByPK = false;
{
const TPKRangeFilter::EUsageClass usage =
ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey());
switch (usage) {
const bool partialUsageByPK = [&]() {
switch (source->GetUsageClass()) {
case TPKRangeFilter::EUsageClass::PartialUsage:
partialUsageByPK = true;
break;
return true;
case TPKRangeFilter::EUsageClass::DontUsage:
partialUsageByPK = true;
break;
return true;
case TPKRangeFilter::EUsageClass::FullUsage:
partialUsageByPK = false;
break;
return false;
}
}
}();
const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false);
const bool isWholeExclusiveSource = source->GetExclusiveIntervalOnly() && source->IsSourceInMemory();
const bool hasDeletions = source->GetHasDeletions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) {
continue;
}
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirstVerified()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLastVerified())) {
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified());
}
}
Expand All @@ -39,7 +39,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) {
continue;
}
} else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirstVerified(), i.GetLastVerified()) ==
} else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) ==
TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
Expand Down
Loading
Loading