From 6862d3c0e8769366a7221ccabe621a18ff0549bf Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 23 Sep 2024 12:09:29 +0300 Subject: [PATCH] fix mvcc tests. use write id as row feature for conflicts resolving (#9598) --- ydb/core/formats/arrow/arrow_filter.cpp | 6 +- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 4 +- ydb/core/protos/feature_flags.proto | 3 +- ydb/core/tx/columnshard/columnshard_schema.h | 67 +++++++------- ydb/core/tx/columnshard/common/portion.h | 4 +- .../engines/changes/compaction/merger.cpp | 11 +-- .../engines/changes/general_compaction.cpp | 1 + .../engines/changes/indexation.cpp | 2 +- .../engines/insert_table/committed.h | 88 ++++++++++--------- .../engines/insert_table/insert_table.cpp | 2 +- .../constructor/read_metadata.cpp | 10 +-- .../plain_reader/iterator/plain_read_data.cpp | 10 +-- .../reader/plain_reader/iterator/source.cpp | 13 ++- .../reader/plain_reader/iterator/source.h | 10 +-- .../engines/scheme/abstract/index_info.cpp | 15 +++- .../engines/scheme/abstract/index_info.h | 30 ++++--- .../columnshard/engines/ut/ut_logs_engine.cpp | 2 +- 17 files changed, 156 insertions(+), 122 deletions(-) diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index c404a016f4bd..62b47a66e24c 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter } TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const { - if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) { - return TIterator(reverse, expectedSize, LastValue); + if (IsTotalAllowFilter()) { + return TIterator(reverse, expectedSize, true); + } else if (IsTotalDenyFilter()) { + return TIterator(reverse, expectedSize, false); } else { AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse); return TIterator(reverse, Filter, GetStartValue(reverse)); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 62e345ba8258..ec81be006de5 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]"); testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]"); testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]"); - testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]"); testHelper.RebootTablets(testTable.GetName()); - testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]"); } Y_UNIT_TEST(AddColumnErrors) { diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 52b2398d7f02..f8c4d1c49aa3 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -162,5 +162,6 @@ message TFeatureFlags { optional bool EnableExternalDataSourcesOnServerless = 143 [default = true]; optional bool EnableSparsedColumns = 144 [default = false]; optional bool EnableParameterizedDecimal = 145 [default = false]; - optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false]; + optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false]; + optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false]; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 8b104b9dcd58..4f08426ddd70 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema { struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {}; struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {}; + struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey; - using TColumns = TableColumns; + using TColumns = TableColumns; }; struct IndexGranules : NIceDb::Schema::Table { @@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema { .Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "") .Update(NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), NIceDb::TUpdate(data.GetBlobRange().Offset), + NIceDb::TUpdate((ui64)data.GetInsertWriteId()), NIceDb::TUpdate(data.GetBlobRange().Size), NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), NIceDb::TUpdate(data.GetSchemaVersion())); @@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema { .Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), data.GetDedupId()) .Update(NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate((ui64)data.GetInsertWriteId()), NIceDb::TUpdate(data.GetBlobRange().Offset), NIceDb::TUpdate(data.GetBlobRange().Size), NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), @@ -982,6 +985,7 @@ class TInsertTableRecordLoadContext { NColumnShard::Schema::EInsertTableIds RecType; ui64 PlanStep; ui64 WriteTxId; + TInsertWriteId InsertWriteId; ui64 PathId; YDB_ACCESSOR_DEF(TString, DedupId); ui64 SchemaVersion; @@ -989,8 +993,8 @@ class TInsertTableRecordLoadContext { std::optional BlobId; TString MetadataString; std::optional Metadata; - std::optional RangeOffset; - std::optional RangeSize; + ui64 RangeOffset; + ui64 RangeSize; void Prepare(const IBlobGroupSelector* dsGroupSelector) { AFL_VERIFY(!PreparedFlag); @@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext { AFL_VERIFY(MetadataString); Y_ABORT_UNLESS(meta.ParseFromString(MetadataString)); Metadata = std::move(meta); - AFL_VERIFY(!!RangeOffset == !!RangeSize); } bool PreparedFlag = false; @@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext { public: TInsertWriteId GetInsertWriteId() const { AFL_VERIFY(ParsedFlag); - AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed); - return (TInsertWriteId)WriteTxId; + return InsertWriteId; + } + + ui64 GetTxId() const { + AFL_VERIFY(ParsedFlag); + AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed); + return WriteTxId; } NColumnShard::Schema::EInsertTableIds GetRecType() const { @@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext { ui64 GetPlanStep() const { AFL_VERIFY(ParsedFlag); + AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed); return PlanStep; } @@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext { void Upsert(NIceDb::TNiceDb& db) const { AFL_VERIFY(ParsedFlag); using namespace NColumnShard; - if (RangeOffset) { - db.Table() - .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) - .Update(NIceDb::TUpdate(BlobIdString), - NIceDb::TUpdate(*RangeOffset), - NIceDb::TUpdate(*RangeSize), NIceDb::TUpdate(MetadataString), - NIceDb::TUpdate(SchemaVersion)); - } else { - db.Table() - .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) - .Update(NIceDb::TUpdate(BlobIdString), NIceDb::TUpdate(MetadataString), - NIceDb::TUpdate(SchemaVersion)); - } + db.Table() + .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) + .Update(NIceDb::TUpdate(BlobIdString), + NIceDb::TUpdate(RangeOffset), + NIceDb::TUpdate(RangeSize), NIceDb::TUpdate(MetadataString), + NIceDb::TUpdate(SchemaVersion)); } template @@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext { PlanStep = rowset.template GetValue(); WriteTxId = rowset.template GetValueOrDefault(); AFL_VERIFY(WriteTxId); + InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault(WriteTxId); PathId = rowset.template GetValue(); DedupId = rowset.template GetValue(); - SchemaVersion = - rowset.template HaveValue() ? rowset.template GetValue() : 0; + SchemaVersion = rowset.template GetValueOrDefault(0); BlobIdString = rowset.template GetValue(); MetadataString = rowset.template GetValue(); - if (rowset.template HaveValue()) { - RangeOffset = rowset.template GetValue(); - } - if (rowset.template HaveValue()) { - RangeSize = rowset.template GetValue(); - } + AFL_VERIFY(rowset.template HaveValue()); + AFL_VERIFY(rowset.template HaveValue()); + RangeOffset = rowset.template GetValue(); + RangeSize = rowset.template GetValue(); } NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) { Prepare(dsGroupSelector); using namespace NColumnShard; AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed); - auto userData = std::make_shared(PathId, - NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt); + auto userData = std::make_shared( + PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt); AFL_VERIFY(!!DedupId); AFL_VERIFY(PlanStep); - return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId); + return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId); } NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) { Prepare(dsGroupSelector); using namespace NColumnShard; + AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId); AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed); - auto userData = std::make_shared(PathId, - NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt); + auto userData = std::make_shared( + PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt); AFL_VERIFY(!DedupId); AFL_VERIFY(!PlanStep); - return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData); + return NOlap::TInsertedData(InsertWriteId, userData); } }; diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h index d8497d5174a9..311cfa23269c 100644 --- a/ydb/core/tx/columnshard/common/portion.h +++ b/ydb/core/tx/columnshard/common/portion.h @@ -17,10 +17,12 @@ class TSpecialColumns { public: static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; + static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id"; static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag"; static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00; static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1; - static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2; + static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2; + static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3; }; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 439426439867..825f65f80106 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -76,6 +76,10 @@ std::vector TMerger::Execute(const std::shared TMergingContext mergingContext(batchResults, Batches); for (auto&& [columnId, columnData] : columnsData) { + if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID && + (!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) { + continue; + } const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName)); auto columnInfo = stats->GetColumnInfo(columnId); @@ -125,13 +129,6 @@ std::vector TMerger::Execute(const std::shared AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); } - auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx); - Y_ABORT_UNLESS(columnSnapshotTxIdx); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); - Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); - std::vector batchSlices; std::shared_ptr schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats)); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index ea7b6ddc2eb4..2f76ab4b1772 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks( if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); } + dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID); } resultFiltered = std::make_shared(resultSchema, dataColumnIds); { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 22ca7fd2c738..edce92470ad9 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont batch = std::make_shared(NArrow::DeserializeBatch(blobData, batchSchema)); blobSchema->AdaptBatchToSchema(*batch, resultSchema); } - IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot()); + IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId()); auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId()); diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h index b075d9b8a390..141ca4cb5627 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/committed.h +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -9,6 +9,7 @@ class TCommittedData: public TUserDataContainer { private: using TBase = TUserDataContainer; YDB_READONLY(TSnapshot, Snapshot, NOlap::TSnapshot::Zero()); + YDB_READONLY(TInsertWriteId, InsertWriteId, (TInsertWriteId)0); YDB_READONLY_DEF(TString, DedupId); YDB_READONLY(bool, Remove, false); @@ -16,19 +17,23 @@ class TCommittedData: public TUserDataContainer { TCommittedData(const std::shared_ptr& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId) : TBase(userData) , Snapshot(planStep, txId) + , InsertWriteId(insertWriteId) , DedupId(ToString(planStep) + ":" + ToString((ui64)insertWriteId)) { } - TCommittedData(const std::shared_ptr& userData, const ui64 planStep, const ui64 txId, const TString& dedupId) + TCommittedData(const std::shared_ptr& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId, + const TString& dedupId) : TBase(userData) , Snapshot(planStep, txId) + , InsertWriteId(insertWriteId) , DedupId(dedupId) { } TCommittedData(const std::shared_ptr& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId) : TBase(userData) , Snapshot(ss) - , DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) { + , InsertWriteId(ephemeralWriteId) + , DedupId(ToString(generation) + ":" + ToString((ui64)ephemeralWriteId)) { } void SetRemove() { @@ -52,7 +57,8 @@ class TCommittedData: public TUserDataContainer { class TCommittedBlob { private: TBlobRange BlobRange; - std::variant WriteInfo; + std::optional CommittedSnapshot; + const TInsertWriteId InsertWriteId; YDB_READONLY(ui64, SchemaVersion, 0); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(bool, IsDelete, false); @@ -61,6 +67,31 @@ class TCommittedBlob { YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); public: + const std::optional& GetCommittedSnapshot() const { + return CommittedSnapshot; + } + + const TSnapshot& GetCommittedSnapshotDef(const TSnapshot& def) const { + if (CommittedSnapshot) { + return *CommittedSnapshot; + } else { + return def; + } + } + + const TSnapshot& GetCommittedSnapshotVerified() const { + AFL_VERIFY(!!CommittedSnapshot); + return *CommittedSnapshot; + } + + bool IsCommitted() const { + return !!CommittedSnapshot; + } + + TInsertWriteId GetInsertWriteId() const { + return InsertWriteId; + } + const NArrow::TReplaceKey& GetFirst() const { return First; } @@ -72,11 +103,12 @@ class TCommittedBlob { return BlobRange.Size; } - TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, + TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount, const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) - , WriteInfo(snapshot) + , CommittedSnapshot(snapshot) + , InsertWriteId(insertWriteId) , SchemaVersion(schemaVersion) , RecordsCount(recordsCount) , IsDelete(isDelete) @@ -85,11 +117,11 @@ class TCommittedBlob { , SchemaSubset(subset) { } - TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, + TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount, const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) - , WriteInfo(writeId) + , InsertWriteId(insertWriteId) , SchemaVersion(schemaVersion) , RecordsCount(recordsCount) , IsDelete(isDelete) @@ -107,43 +139,13 @@ class TCommittedBlob { return BlobRange.Hash(); } TString DebugString() const { - if (auto* ss = GetSnapshotOptional()) { - return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString(); - } else { - return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified(); + TStringBuilder sb; + sb << BlobRange; + if (CommittedSnapshot) { + sb << ";snapshot=" << CommittedSnapshot->DebugString(); } - } - - bool HasSnapshot() const { - return GetSnapshotOptional(); - } - - const TSnapshot& GetSnapshotDef(const TSnapshot& def) const { - if (auto* snapshot = GetSnapshotOptional()) { - return *snapshot; - } else { - return def; - } - } - - const TSnapshot* GetSnapshotOptional() const { - return std::get_if(&WriteInfo); - } - - const TSnapshot& GetSnapshotVerified() const { - auto* result = GetSnapshotOptional(); - AFL_VERIFY(result); - return *result; - } - - const TInsertWriteId* GetWriteIdOptional() const { - return std::get_if(&WriteInfo); - } - - TInsertWriteId GetWriteIdVerified() const { - auto* result = GetWriteIdOptional(); - AFL_VERIFY(result); - return *result; + sb << ";write_id=" << GetInsertWriteId(); + return sb; } const TBlobRange& GetBlobRange() const { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 7b8bd9334ac4..56a4f730a422 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -148,7 +148,7 @@ std::vector TInsertTable::Read(ui64 pathId, const std::optional< if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } - result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), + result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetInsertWriteId(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index c24fbe0577a7..5d93272c07b7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -34,11 +34,11 @@ TConclusionStatus TReadMetadata::Init( if (LockId) { for (auto&& i : CommittedBlobs) { - if (auto writeId = i.GetWriteIdOptional()) { - if (owner->HasLongTxWrites(*writeId)) { + if (!i.IsCommitted()) { + if (owner->HasLongTxWrites(i.GetInsertWriteId())) { } else { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); - AddWriteIdToCheck(*writeId, op->GetLockId()); + auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i.GetInsertWriteId()); + AddWriteIdToCheck(i.GetInsertWriteId(), op->GetLockId()); } } } @@ -125,7 +125,7 @@ void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalIm bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { AFL_VERIFY(LockSharingInfo); auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); + AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size()); return it->second.GetLockId() == LockSharingInfo->GetLockId(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index 04ed0d1c6f26..5780b3219180 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -22,21 +22,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) sources.emplace_back(std::make_shared(sourceIdx++, i, SpecialReadContext)); } for (auto&& i : committed) { - if (i.HasSnapshot()) { + if (i.IsCommitted()) { continue; } - if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) { + if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) { continue; } if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) { - GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified()); + GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId()); } } for (auto&& i : committed) { - if (!i.HasSnapshot()) { - if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) { + if (!i.IsCommitted()) { + if (GetReadMetadata()->IsWriteConflictable(i.GetInsertWriteId())) { continue; } } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) == diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index bef10d38f6b1..5e4d80fbfe43 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -225,9 +225,20 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& AFL_VERIFY(rBatch)("schema", schema->ToString()); auto batch = std::make_shared(rBatch); batchSchema->AdaptBatchToSchema(*batch, resultSchema); - GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshotDef(TSnapshot::Zero())); + TSnapshot ss = TSnapshot::Zero(); + if (CommittedBlob.IsCommitted()) { + ss = CommittedBlob.GetCommittedSnapshotVerified(); + } else { + ss = GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId()) + ? GetContext()->GetReadMetadata()->GetRequestSnapshot() + : TSnapshot::Zero(); + } + GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, ss, (ui64)CommittedBlob.GetInsertWriteId()); GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete()); MutableStageData().AddBatch(batch); + if (CommittedBlob.GetIsDelete()) { + MutableStageData().AddFilter(NArrow::TColumnFilter::BuildDenyFilter()); + } } MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *resultSchema); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 889f9fe5e7d4..69b39059bff5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -420,11 +420,11 @@ class TCommittedDataSource: public IDataSource { } virtual bool DoAddTxConflict() override { - if (CommittedBlob.HasSnapshot()) { + if (CommittedBlob.IsCommitted()) { GetContext()->GetReadMetadata()->SetBrokenWithCommitted(); return true; - } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetWriteIdVerified())) { - GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetWriteIdVerified()); + } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId())) { + GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetInsertWriteId()); return true; } return false; @@ -467,8 +467,8 @@ class TCommittedDataSource: public IDataSource { } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context) - : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) + : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetCommittedSnapshotDef(TSnapshot::Zero()), + committed.GetCommittedSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) , CommittedBlob(committed) { } }; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp index 78fae44fb6d0..c1c31cb5487f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp @@ -21,11 +21,12 @@ void IIndexInfo::AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bo NArrow::TThreadSimpleArraysCache::GetConst(arrow::boolean(), std::make_shared(isDelete), numRows)).Validate(); } -void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot) { +void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId) { const i64 numRows = batch.num_rows(); batch.AddField(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)).Validate(); batch.AddField(arrow::field(SPEC_COL_TX_ID, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)).Validate(); + batch.AddField(arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()), NArrow::MakeUI64Array(insertWriteId, numRows)).Validate(); } void IIndexInfo::NormalizeDeletionColumn(NArrow::TGeneralContainer& batch) { @@ -40,6 +41,8 @@ std::optional IIndexInfo::GetColumnIdOptional(const std::string& name) con return ui32(ESpecialColumn::PLAN_STEP); } else if (name == SPEC_COL_TX_ID) { return ui32(ESpecialColumn::TX_ID); + } else if (name == SPEC_COL_WRITE_ID) { + return ui32(ESpecialColumn::WRITE_ID); } else if (name == SPEC_COL_DELETE_FLAG) { return ui32(ESpecialColumn::DELETE_FLAG); } @@ -51,8 +54,10 @@ std::optional IIndexInfo::GetColumnIndexOptional(const std::string& name, return shift + 0; } else if (name == SPEC_COL_TX_ID) { return shift + 1; - } else if (name == SPEC_COL_DELETE_FLAG) { + } else if (name == SPEC_COL_WRITE_ID) { return shift + 2; + } else if (name == SPEC_COL_DELETE_FLAG) { + return shift + 3; } return {}; } @@ -62,6 +67,8 @@ TString IIndexInfo::GetColumnName(const ui32 id, const bool required) const { return SPEC_COL_PLAN_STEP; } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) { return SPEC_COL_TX_ID; + } else if (ESpecialColumn(id) == ESpecialColumn::WRITE_ID) { + return SPEC_COL_WRITE_ID; } else if (ESpecialColumn(id) == ESpecialColumn::DELETE_FLAG) { return SPEC_COL_DELETE_FLAG; } else { @@ -88,6 +95,8 @@ std::shared_ptr IIndexInfo::GetColumnFieldOptional(const ui32 colu return ArrowSchemaSnapshot()->field(0); } else if (ESpecialColumn(columnId) == ESpecialColumn::TX_ID) { return ArrowSchemaSnapshot()->field(1); + } else if (ESpecialColumn(columnId) == ESpecialColumn::WRITE_ID) { + return ArrowSchemaSnapshot()->field(2); } else if (ESpecialColumn(columnId) == ESpecialColumn::DELETE_FLAG) { return ArrowSchemaDeletion()->field(0); } else { @@ -106,6 +115,8 @@ std::shared_ptr IIndexInfo::DefaultColumnValue(const ui32 colId) return nullptr; } else if (colId == (ui32)ESpecialColumn::TX_ID) { return nullptr; + } else if (colId == (ui32)ESpecialColumn::WRITE_ID) { + return nullptr; } else if (colId == (ui32)ESpecialColumn::DELETE_FLAG) { static const std::shared_ptr deleteDefault(new arrow::BooleanScalar(false)); return deleteDefault; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index c986fe6b2aab..19fbe2267a7e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -16,7 +16,8 @@ class IIndexInfo { enum class ESpecialColumn : ui32 { PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX, TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX, - DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX + WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX, + DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX, }; using TSystemColumnsSet = ui64; @@ -28,6 +29,7 @@ class IIndexInfo { static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP; static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID; + static constexpr const char* SPEC_COL_WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID; static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG; static const char* GetDeleteFlagColumnName() { @@ -35,17 +37,18 @@ class IIndexInfo { } static const std::set& GetNecessarySystemColumnIdsSet() { - static const std::set result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::set result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } static const std::vector& GetSnapshotColumnNames() { - static const std::vector result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; + static const std::vector result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID), + std::string(SPEC_COL_WRITE_ID) }; return result; } static const std::vector& GetSnapshotColumnIds() { - static const std::vector result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::vector result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } @@ -85,8 +88,10 @@ class IIndexInfo { static void AddSnapshotFields(std::vector>& fields) { static const std::shared_ptr ps = arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()); static const std::shared_ptr txid = arrow::field(SPEC_COL_TX_ID, arrow::uint64()); + static const std::shared_ptr writeId = arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()); fields.push_back(ps); fields.push_back(txid); + fields.push_back(writeId); } static void AddDeleteFields(std::vector>& fields) { @@ -94,18 +99,18 @@ class IIndexInfo { } static const std::set& GetSnapshotColumnIdsSet() { - static const std::set result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::set result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } static const std::vector& GetSystemColumnNames() { static const std::vector result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID), - std::string(SPEC_COL_DELETE_FLAG) }; + std::string(SPEC_COL_WRITE_ID), std::string(SPEC_COL_DELETE_FLAG) }; return result; } static const std::vector& GetSystemColumnIds() { - static const std::vector result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, + static const std::vector result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID, (ui32)ESpecialColumn::DELETE_FLAG }; return result; } @@ -143,7 +148,7 @@ class IIndexInfo { static void NormalizeDeletionColumn(NArrow::TGeneralContainer& batch); - static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot); + static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId); static void AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bool isDelete); static ui64 GetSpecialColumnsRecordSize() { @@ -151,8 +156,8 @@ class IIndexInfo { } static std::shared_ptr ArrowSchemaSnapshot() { - static std::shared_ptr result = std::make_shared( - arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), arrow::field(SPEC_COL_TX_ID, arrow::uint64()) }); + static std::shared_ptr result = std::make_shared(arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), + arrow::field(SPEC_COL_TX_ID, arrow::uint64()), arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()) }); return result; } @@ -167,12 +172,13 @@ class IIndexInfo { } static bool IsSpecialColumn(const std::string& fieldName) { - return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_DELETE_FLAG; + return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_WRITE_ID || + fieldName == SPEC_COL_DELETE_FLAG; } static bool IsSpecialColumn(const ui32 fieldId) { return fieldId == (ui32)ESpecialColumn::PLAN_STEP || fieldId == (ui32)ESpecialColumn::TX_ID || - fieldId == (ui32)ESpecialColumn::DELETE_FLAG; + fieldId == (ui32)ESpecialColumn::WRITE_ID || fieldId == (ui32)ESpecialColumn::DELETE_FLAG; } static bool IsNullableVerified(const ui32 /*fieldId*/) { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 622c022a1688..38dc6ffa044d 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -497,7 +497,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 txId = 1; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size()); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size() - 1); } { // select another pathId