Skip to content

Commit

Permalink
fix mvcc tests. use write id as row feature for conflicts resolving (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 23, 2024
1 parent b47a8b8 commit 6862d3c
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 122 deletions.
6 changes: 4 additions & 2 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}

TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
return TIterator(reverse, expectedSize, LastValue);
if (IsTotalAllowFilter()) {
return TIterator(reverse, expectedSize, true);
} else if (IsTotalDenyFilter()) {
return TIterator(reverse, expectedSize, false);
} else {
AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]");
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");

testHelper.RebootTablets(testTable.GetName());
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
}

Y_UNIT_TEST(AddColumnErrors) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ message TFeatureFlags {
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableParameterizedDecimal = 145 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false];
}
67 changes: 34 additions & 33 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {

struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {};
struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>;
};

struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
Expand Down Expand Up @@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "")
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()));
Expand All @@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(),
data.GetDedupId())
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
Expand Down Expand Up @@ -982,15 +985,16 @@ class TInsertTableRecordLoadContext {
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
TInsertWriteId InsertWriteId;
ui64 PathId;
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
TString BlobIdString;
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
std::optional<ui64> RangeOffset;
std::optional<ui64> RangeSize;
ui64 RangeOffset;
ui64 RangeSize;

void Prepare(const IBlobGroupSelector* dsGroupSelector) {
AFL_VERIFY(!PreparedFlag);
Expand All @@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext {
AFL_VERIFY(MetadataString);
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
Metadata = std::move(meta);
AFL_VERIFY(!!RangeOffset == !!RangeSize);
}

bool PreparedFlag = false;
Expand All @@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext {
public:
TInsertWriteId GetInsertWriteId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
return (TInsertWriteId)WriteTxId;
return InsertWriteId;
}

ui64 GetTxId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return WriteTxId;
}

NColumnShard::Schema::EInsertTableIds GetRecType() const {
Expand All @@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext {

ui64 GetPlanStep() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return PlanStep;
}

Expand All @@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext {
void Upsert(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
using namespace NColumnShard;
if (RangeOffset) {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
} else {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}

template <class TRowset>
Expand All @@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext {
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
AFL_VERIFY(WriteTxId);
InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId);

PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
SchemaVersion =
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0);
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
}
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>());
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>());
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}

NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!!DedupId);
AFL_VERIFY(PlanStep);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
}

NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId);
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!DedupId);
AFL_VERIFY(!PlanStep);
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
return NOlap::TInsertedData(InsertWriteId, userData);
}
};

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/common/portion.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ class TSpecialColumns {
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id";
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3;
};

}
11 changes: 4 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
TMergingContext mergingContext(batchResults, Batches);

for (auto&& [columnId, columnData] : columnsData) {
if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID &&
(!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) {
continue;
}
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
auto columnInfo = stats->GetColumnInfo(columnId);
Expand Down Expand Up @@ -125,13 +129,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
Y_ABORT_UNLESS(columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);

std::vector<TGeneralSerializedSlice> batchSlices;
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
}
resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
}
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());

auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId());

Expand Down
Loading

0 comments on commit 6862d3c

Please sign in to comment.