Skip to content

Commit

Permalink
fix mvcc tests. use write id as row feature for conflicts resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 21, 2024
1 parent 5321b21 commit d6be0b7
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 112 deletions.
6 changes: 4 additions & 2 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}

TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
return TIterator(reverse, expectedSize, LastValue);
if (IsTotalAllowFilter()) {
return TIterator(reverse, expectedSize, true);
} else if (IsTotalDenyFilter()) {
return TIterator(reverse, expectedSize, false);
} else {
AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]");
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");

testHelper.RebootTablets(testTable.GetName());
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
}

Y_UNIT_TEST(AddColumnErrors) {
Expand Down
67 changes: 34 additions & 33 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {

struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {};
struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>;
};

struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
Expand Down Expand Up @@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "")
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()));
Expand All @@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(),
data.GetDedupId())
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
Expand Down Expand Up @@ -982,15 +985,16 @@ class TInsertTableRecordLoadContext {
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
TInsertWriteId InsertWriteId;
ui64 PathId;
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
TString BlobIdString;
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
std::optional<ui64> RangeOffset;
std::optional<ui64> RangeSize;
ui64 RangeOffset;
ui64 RangeSize;

void Prepare(const IBlobGroupSelector* dsGroupSelector) {
AFL_VERIFY(!PreparedFlag);
Expand All @@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext {
AFL_VERIFY(MetadataString);
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
Metadata = std::move(meta);
AFL_VERIFY(!!RangeOffset == !!RangeSize);
}

bool PreparedFlag = false;
Expand All @@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext {
public:
TInsertWriteId GetInsertWriteId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
return (TInsertWriteId)WriteTxId;
return InsertWriteId;
}

ui64 GetTxId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return WriteTxId;
}

NColumnShard::Schema::EInsertTableIds GetRecType() const {
Expand All @@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext {

ui64 GetPlanStep() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return PlanStep;
}

Expand All @@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext {
void Upsert(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
using namespace NColumnShard;
if (RangeOffset) {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
} else {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}

template <class TRowset>
Expand All @@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext {
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
AFL_VERIFY(WriteTxId);
InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId);

PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
SchemaVersion =
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0);
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
}
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>());
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>());
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}

NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!!DedupId);
AFL_VERIFY(PlanStep);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
}

NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId);
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!DedupId);
AFL_VERIFY(!PlanStep);
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
return NOlap::TInsertedData(InsertWriteId, userData);
}
};

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/common/portion.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ class TSpecialColumns {
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id";
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3;
};

}
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
}
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());

auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId());

Expand Down
86 changes: 44 additions & 42 deletions ydb/core/tx/columnshard/engines/insert_table/committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,30 @@ class TCommittedData: public TUserDataContainer {
private:
using TBase = TUserDataContainer;
YDB_READONLY(TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
YDB_READONLY(TInsertWriteId, InsertWriteId, (TInsertWriteId)0);
YDB_READONLY_DEF(TString, DedupId);
YDB_READONLY(bool, Remove, false);

public:
TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId)
: TBase(userData)
, Snapshot(planStep, txId)
, InsertWriteId(insertWriteId)
, DedupId(ToString(planStep) + ":" + ToString((ui64)insertWriteId)) {
}

TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TString& dedupId)
TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId,
const TString& dedupId)
: TBase(userData)
, Snapshot(planStep, txId)
, InsertWriteId(insertWriteId)
, DedupId(dedupId) {
}

TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId)
: TBase(userData)
, Snapshot(ss)
, InsertWriteId(insertWriteId)
, DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) {
}

Expand All @@ -52,7 +57,8 @@ class TCommittedData: public TUserDataContainer {
class TCommittedBlob {
private:
TBlobRange BlobRange;
std::variant<TSnapshot, TInsertWriteId> WriteInfo;
std::optional<TSnapshot> CommittedSnapshot;
const TInsertWriteId InsertWriteId;
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, RecordsCount, 0);
YDB_READONLY(bool, IsDelete, false);
Expand All @@ -61,6 +67,31 @@ class TCommittedBlob {
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);

public:
const std::optional<TSnapshot>& GetCommittedSnapshot() const {
return CommittedSnapshot;
}

const TSnapshot& GetCommittedSnapshotDef(const TSnapshot& def) const {
if (CommittedSnapshot) {
return *CommittedSnapshot;
} else {
return def;
}
}

const TSnapshot& GetCommittedSnapshotVerified() const {
AFL_VERIFY(!!CommittedSnapshot);
return *CommittedSnapshot;
}

bool IsCommitted() const {
return !!CommittedSnapshot;
}

TInsertWriteId GetInsertWriteId() const {
return InsertWriteId;
}

const NArrow::TReplaceKey& GetFirst() const {
return First;
}
Expand All @@ -72,11 +103,12 @@ class TCommittedBlob {
return BlobRange.Size;
}

TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount,
TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(snapshot)
, CommittedSnapshot(snapshot)
, InsertWriteId(insertWriteId)
, SchemaVersion(schemaVersion)
, RecordsCount(recordsCount)
, IsDelete(isDelete)
Expand All @@ -85,11 +117,11 @@ class TCommittedBlob {
, SchemaSubset(subset) {
}

TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount,
TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
, WriteInfo(writeId)
, InsertWriteId(insertWriteId)
, SchemaVersion(schemaVersion)
, RecordsCount(recordsCount)
, IsDelete(isDelete)
Expand All @@ -107,43 +139,13 @@ class TCommittedBlob {
return BlobRange.Hash();
}
TString DebugString() const {
if (auto* ss = GetSnapshotOptional()) {
return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString();
} else {
return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified();
TStringBuilder sb;
sb << BlobRange;
if (CommittedSnapshot) {
sb << ";snapshot=" << CommittedSnapshot->DebugString();
}
}

bool HasSnapshot() const {
return GetSnapshotOptional();
}

const TSnapshot& GetSnapshotDef(const TSnapshot& def) const {
if (auto* snapshot = GetSnapshotOptional()) {
return *snapshot;
} else {
return def;
}
}

const TSnapshot* GetSnapshotOptional() const {
return std::get_if<TSnapshot>(&WriteInfo);
}

const TSnapshot& GetSnapshotVerified() const {
auto* result = GetSnapshotOptional();
AFL_VERIFY(result);
return *result;
}

const TInsertWriteId* GetWriteIdOptional() const {
return std::get_if<TInsertWriteId>(&WriteInfo);
}

TInsertWriteId GetWriteIdVerified() const {
auto* result = GetWriteIdOptional();
AFL_VERIFY(result);
return *result;
sb << ";write_id=" << GetInsertWriteId();
return sb;
}

const TBlobRange& GetBlobRange() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetInsertWriteId(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ TConclusionStatus TReadMetadata::Init(

if (LockId) {
for (auto&& i : CommittedBlobs) {
if (auto writeId = i.GetWriteIdOptional()) {
if (owner->HasLongTxWrites(*writeId)) {
if (!i.IsCommitted()) {
if (owner->HasLongTxWrites(i.GetInsertWriteId())) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i.GetInsertWriteId());
AddWriteIdToCheck(i.GetInsertWriteId(), op->GetLockId());
}
}
}
Expand Down Expand Up @@ -125,7 +125,7 @@ void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalIm
bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const {
AFL_VERIFY(LockSharingInfo);
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size());
return it->second.GetLockId() == LockSharingInfo->GetLockId();
}

Expand Down
Loading

0 comments on commit d6be0b7

Please sign in to comment.