diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h index ed2ffdadb3ca..bd633647b5ec 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/committed.h +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -56,27 +56,24 @@ class TCommittedBlob { YDB_READONLY(ui64, SchemaVersion, 0); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(bool, IsDelete, false); - YDB_READONLY_DEF(std::optional, First); - YDB_READONLY_DEF(std::optional, Last); + NArrow::TReplaceKey First; + NArrow::TReplaceKey Last; YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); public: - ui64 GetSize() const { - return BlobRange.Size; + const NArrow::TReplaceKey& GetFirst() const { + return First; } - - const NArrow::TReplaceKey& GetFirstVerified() const { - Y_ABORT_UNLESS(First); - return *First; + const NArrow::TReplaceKey& GetLast() const { + return Last; } - const NArrow::TReplaceKey& GetLastVerified() const { - Y_ABORT_UNLESS(Last); - return *Last; + ui64 GetSize() const { + return BlobRange.Size; } TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) , WriteInfo(snapshot) @@ -89,7 +86,7 @@ class TCommittedBlob { } TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) , WriteInfo(writeId) diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index a948c0077077..18e69a2b0f55 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -13,6 +13,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { dbTable.Insert(*dataPtr); return true; } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_insertion"); return false; } } @@ -39,11 +40,15 @@ TInsertionSummary::TCounters TInsertTable::Commit( auto* pathInfo = Summary.GetPathInfoOptional(pathId); // There could be commit after drop: propose, drop, plan if (pathInfo && pathExists(pathId)) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString()); auto committed = data->Commit(planStep, txId); dbTable.Commit(committed); pathInfo->AddCommitted(std::move(committed)); } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString()); dbTable.Abort(*data); Summary.AddAborted(std::move(*data)); } @@ -58,6 +63,8 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& wr for (auto writeId : writeIds) { // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId. if (std::optional data = Summary.ExtractInserted(writeId)) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString())("write_id", writeId); dbTable.EraseInserted(*data); dbTable.Abort(*data); Summary.AddAborted(std::move(*data)); @@ -108,8 +115,8 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant return dbTable.Load(*this, loadTime); } -std::vector TInsertTable::Read( - ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, const std::shared_ptr& pkSchema) const { +std::vector TInsertTable::Read(ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, + const std::shared_ptr& pkSchema, const TPKRangesFilter* pkRangesFilter) const { const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId); if (!pInfo) { return {}; @@ -120,15 +127,27 @@ std::vector TInsertTable::Read( for (const auto& data : pInfo->GetCommitted()) { if (lockId || data.GetSnapshot() <= reqSnapshot) { + auto start = data.GetMeta().GetFirstPK(pkSchema); + auto finish = data.GetMeta().GetLastPK(pkSchema); + AFL_VERIFY(start); + AFL_VERIFY(finish); + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + continue; + } result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), - data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); + *start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } if (lockId) { for (const auto& [writeId, data] : pInfo->GetInserted()) { - result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), + auto start = data.GetMeta().GetFirstPK(pkSchema); + auto finish = data.GetMeta().GetLastPK(pkSchema); + AFL_VERIFY(start); + AFL_VERIFY(finish); + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + continue; + } + result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), *start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 324cbbf87946..b44e64312191 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -9,7 +9,7 @@ #include namespace NKikimr::NOlap { - +class TPKRangesFilter; class IDbWrapper; /// Use one table for inserted and committed blobs: @@ -57,6 +57,7 @@ class TInsertTableAccessor { return Summary.AddInserted(std::move(data), load); } bool AddAborted(TInsertedData&& data, const bool load) { + AFL_VERIFY_DEBUG(!Summary.ExtractInserted(data.GetInsertWriteId())); if (load) { AddBlobLink(data.GetBlobRange().BlobId); } @@ -114,8 +115,8 @@ class TInsertTable: public TInsertTableAccessor { void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); void EraseAbortedOnComplete(const TInsertedData& key); - std::vector Read( - ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, const std::shared_ptr& pkSchema) const; + std::vector Read(ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, + const std::shared_ptr& pkSchema, const TPKRangesFilter* pkRangesFilter) const; bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime); TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 8012211c418e..3ad39fcd209e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -166,8 +166,9 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) { const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) { const TInsertWriteId writeId = data.GetInsertWriteId(); Counters.Aborted.Add(data.BlobSize(), load); + AFL_VERIFY_DEBUG(!Inserted.contains(writeId)); auto insertInfo = Aborted.emplace(writeId, std::move(data)); - Y_ABORT_UNLESS(insertInfo.second); + AFL_VERIFY(insertInfo.second)("write_id", writeId); return &insertInfo.first->second; } @@ -191,6 +192,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedDat const ui32 dataSize = data.BlobSize(); const ui64 pathId = data.GetPathId(); auto insertInfo = Inserted.emplace(writeId, std::move(data)); + AFL_VERIFY_DEBUG(!Aborted.contains(writeId)); if (insertInfo.second) { OnNewInserted(GetPathInfo(pathId), dataSize, load); return &insertInfo.first->second; diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp index dd01511460c2..88416a4d214f 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp @@ -25,7 +25,8 @@ ISnapshotSchema::TPtr TReadMetadataBase::GetLoadSchemaVerified(const TPortionInf std::vector TDataStorageAccessor::GetCommitedBlobs(const TReadDescription& readDescription, const std::shared_ptr& pkSchema, const std::optional lockId, const TSnapshot& reqSnapshot) const { - return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema)); + AFL_VERIFY(readDescription.PKRangesFilter); + return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema, &*readDescription.PKRangesFilter)); } } // namespace NKikimr::NOlap::NReader diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index ec228c363b90..04ed0d1c6f26 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -28,8 +28,8 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) { continue; } - if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirstVerified()) || - GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLastVerified())) { + if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || + GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) { GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified()); } } @@ -39,7 +39,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) { continue; } - } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirstVerified(), i.GetLastVerified()) == + } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 8121c73a6296..879c42d142a6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -456,9 +456,8 @@ class TCommittedDataSource: public IDataSource { } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context) - : TBase(sourceIdx, context, committed.GetFirstVerified(), committed.GetLastVerified(), committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetRecordsCount(), {}, committed.GetIsDelete()) + : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()), + committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) , CommittedBlob(committed) { } };