Skip to content

Commit

Permalink
skip not used insertions
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 11, 2024
1 parent b7b8628 commit 4fbcd53
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 30 deletions.
23 changes: 10 additions & 13 deletions ydb/core/tx/columnshard/engines/insert_table/committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,24 @@ class TCommittedBlob {
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, RecordsCount, 0);
YDB_READONLY(bool, IsDelete, false);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);
NArrow::TReplaceKey First;
NArrow::TReplaceKey Last;
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);

public:
ui64 GetSize() const {
return BlobRange.Size;
const NArrow::TReplaceKey& GetFirst() const {
return First;
}

const NArrow::TReplaceKey& GetFirstVerified() const {
Y_ABORT_UNLESS(First);
return *First;
const NArrow::TReplaceKey& GetLast() const {
return Last;
}

const NArrow::TReplaceKey& GetLastVerified() const {
Y_ABORT_UNLESS(Last);
return *Last;
ui64 GetSize() const {
return BlobRange.Size;
}

TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount,
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(snapshot)
Expand All @@ -89,7 +86,7 @@ class TCommittedBlob {
}

TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount,
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(writeId)
Expand Down
31 changes: 25 additions & 6 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
dbTable.Insert(*dataPtr);
return true;
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_insertion");
return false;
}
}
Expand All @@ -39,11 +40,15 @@ TInsertionSummary::TCounters TInsertTable::Commit(
auto* pathInfo = Summary.GetPathInfoOptional(pathId);
// There could be commit after drop: propose, drop, plan
if (pathInfo && pathExists(pathId)) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString());
auto committed = data->Commit(planStep, txId);
dbTable.Commit(committed);

pathInfo->AddCommitted(std::move(committed));
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString());
dbTable.Abort(*data);
Summary.AddAborted(std::move(*data));
}
Expand All @@ -58,6 +63,8 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& wr
for (auto writeId : writeIds) {
// There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
if (std::optional<TInsertedData> data = Summary.ExtractInserted(writeId)) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
"blob_range", data->GetBlobRange().ToString())("write_id", writeId);
dbTable.EraseInserted(*data);
dbTable.Abort(*data);
Summary.AddAborted(std::move(*data));
Expand Down Expand Up @@ -108,8 +115,8 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant
return dbTable.Load(*this, loadTime);
}

std::vector<TCommittedBlob> TInsertTable::Read(
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const {
std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const {
const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);
if (!pInfo) {
return {};
Expand All @@ -120,15 +127,27 @@ std::vector<TCommittedBlob> TInsertTable::Read(

for (const auto& data : pInfo->GetCommitted()) {
if (lockId || data.GetSnapshot() <= reqSnapshot) {
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
AFL_VERIFY(start);
AFL_VERIFY(finish);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
*start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
if (lockId) {
for (const auto& [writeId, data] : pInfo->GetInserted()) {
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
AFL_VERIFY(start);
AFL_VERIFY(finish);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), *start, *finish,
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <ydb/core/tx/columnshard/counters/insert_table.h>

namespace NKikimr::NOlap {

class TPKRangesFilter;
class IDbWrapper;

/// Use one table for inserted and committed blobs:
Expand Down Expand Up @@ -57,6 +57,7 @@ class TInsertTableAccessor {
return Summary.AddInserted(std::move(data), load);
}
bool AddAborted(TInsertedData&& data, const bool load) {
AFL_VERIFY_DEBUG(!Summary.ExtractInserted(data.GetInsertWriteId()));
if (load) {
AddBlobLink(data.GetBlobRange().BlobId);
}
Expand Down Expand Up @@ -114,8 +115,8 @@ class TInsertTable: public TInsertTableAccessor {
void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr<IBlobsDeclareRemovingAction>& blobsAction);
void EraseAbortedOnComplete(const TInsertedData& key);

std::vector<TCommittedBlob> Read(
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;
std::vector<TCommittedBlob> Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const;
bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime);

TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) {
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertWriteId writeId = data.GetInsertWriteId();
Counters.Aborted.Add(data.BlobSize(), load);
AFL_VERIFY_DEBUG(!Inserted.contains(writeId));
auto insertInfo = Aborted.emplace(writeId, std::move(data));
Y_ABORT_UNLESS(insertInfo.second);
AFL_VERIFY(insertInfo.second)("write_id", writeId);
return &insertInfo.first->second;
}

Expand All @@ -191,6 +192,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedDat
const ui32 dataSize = data.BlobSize();
const ui64 pathId = data.GetPathId();
auto insertInfo = Inserted.emplace(writeId, std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
if (insertInfo.second) {
OnNewInserted(GetPathInfo(pathId), dataSize, load);
return &insertInfo.first->second;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ ISnapshotSchema::TPtr TReadMetadataBase::GetLoadSchemaVerified(const TPortionInf

std::vector<TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const TReadDescription& readDescription,
const std::shared_ptr<arrow::Schema>& pkSchema, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot) const {
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema));
AFL_VERIFY(readDescription.PKRangesFilter);
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema, &*readDescription.PKRangesFilter));
}

} // namespace NKikimr::NOlap::NReader
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) {
continue;
}
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirstVerified()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLastVerified())) {
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified());
}
}
Expand All @@ -39,7 +39,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) {
continue;
}
} else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirstVerified(), i.GetLastVerified()) ==
} else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) ==
TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,8 @@ class TCommittedDataSource: public IDataSource {
}

TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context)
: TBase(sourceIdx, context, committed.GetFirstVerified(), committed.GetLastVerified(), committed.GetSnapshotDef(TSnapshot::Zero()),
committed.GetSnapshotDef(TSnapshot::Zero()),
committed.GetRecordsCount(), {}, committed.GetIsDelete())
: TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()),
committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete())
, CommittedBlob(committed) {
}
};
Expand Down

0 comments on commit 4fbcd53

Please sign in to comment.