Skip to content

Commit

Permalink
Register pathes for insert table (#9881)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 1, 2024
1 parent ce9f20d commit b5341bd
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 23 deletions.
27 changes: 15 additions & 12 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
NIceDb::TNiceDb db(txc.DB);
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
{
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
}
ACFL_DEBUG("step", "TInsertTable::Load_Finish");
Self->InsertTable.swap(localInsertTable);
}

{
ACFL_DEBUG("step", "TTablesManager::Load_Start");
TTablesManager tManagerLocal(Self->StoragesManager, Self->TabletID());
Expand All @@ -138,6 +126,21 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ACFL_DEBUG("step", "TTablesManager::Load_Finish");
}

{
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
for (auto&& i : Self->TablesManager.GetTables()) {
localInsertTable->RegisterPathInfo(i.first);
}
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
}
ACFL_DEBUG("step", "TInsertTable::Load_Finish");
Self->InsertTable.swap(localInsertTable);
}

{
ACFL_DEBUG("step", "TTxController::Load_Start");
TMemoryProfileGuard g("TTxInit/TTxController");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());

TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
InsertTable->RegisterPathInfo(pathId);

Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TInsertionSummary::TCounters TInsertTable::CommitEphemeral(IDbWrapper& dbTable,

AddBlobLink(data.GetBlobRange().BlobId);
const ui64 pathId = data.GetPathId();
auto& pathInfo = Summary.GetPathInfo(pathId);
auto& pathInfo = Summary.GetPathInfoVerified(pathId);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", pathId)("blob_range", data.GetBlobRange().ToString());
dbTable.Commit(data);
pathInfo.AddCommitted(std::move(data));
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/engines/insert_table/insert_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class TInsertTableAccessor {
bool RemoveBlobLinkOnComplete(const TUnifiedBlobId& blobId);

public:
TPathInfo& RegisterPathInfo(const ui64 pathId) {
return Summary.RegisterPathInfo(pathId);
}

void ErasePath(const ui64 pathId) {
Summary.ErasePath(pathId);
}
Expand Down Expand Up @@ -64,7 +68,7 @@ class TInsertTableAccessor {
AddBlobLink(data.GetBlobRange().BlobId);
}
const ui64 pathId = data.GetPathId();
return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load);
return Summary.GetPathInfoVerified(pathId).AddCommitted(std::move(data), load);
}
bool HasPathIdData(const ui64 pathId) const {
return Summary.HasPathIdData(pathId);
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ void TInsertionSummary::AddPriority(const TPathInfo& pathInfo) noexcept {
}
}

NKikimr::NOlap::TPathInfo& TInsertionSummary::GetPathInfo(const ui64 pathId) {
TPathInfo& TInsertionSummary::RegisterPathInfo(const ui64 pathId) {
auto it = PathInfo.find(pathId);
if (it == PathInfo.end()) {
it = PathInfo.emplace(pathId, TPathInfo(*this, pathId)).first;
}
return it->second;
}

NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) {
TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) {
auto it = PathInfo.find(pathId);
if (it == PathInfo.end()) {
return nullptr;
}
return &it->second;
}

const NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) const {
const TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) const {
auto it = PathInfo.find(pathId);
if (it == PathInfo.end()) {
return nullptr;
Expand Down Expand Up @@ -134,7 +134,7 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) {
return pathInfo->HasCommitted(data);
}

const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertWriteId writeId = data.GetInsertWriteId();
Counters.Aborted.Add(data.BlobSize(), load);
AFL_VERIFY_DEBUG(!Inserted.contains(writeId));
Expand All @@ -143,7 +143,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData
return &insertInfo.first->second;
}

std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
std::optional<TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
auto result = Inserted.ExtractOptional(id);
if (result) {
auto pathInfo = GetPathInfoOptional(result->GetPathId());
Expand All @@ -154,10 +154,10 @@ std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(
return result;
}

const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
auto* insertInfo = Inserted.AddVerified(std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(insertInfo->GetInsertWriteId()));
OnNewInserted(GetPathInfo(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
OnNewInserted(GetPathInfoVerified(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
return insertInfo;
}

Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,19 @@ class TInsertionSummary {
const NColumnShard::TInsertTableCounters& GetCounters() const {
return Counters;
}
NKikimr::NOlap::TPathInfo& GetPathInfo(const ui64 pathId);
NKikimr::NOlap::TPathInfo& RegisterPathInfo(const ui64 pathId);
TPathInfo* GetPathInfoOptional(const ui64 pathId);
const TPathInfo* GetPathInfoOptional(const ui64 pathId) const;
TPathInfo& GetPathInfoVerified(const ui64 pathId) {
auto* result = GetPathInfoOptional(pathId);
AFL_VERIFY(result);
return *result;
}
const TPathInfo& GetPathInfoVerified(const ui64 pathId) const {
auto* result = GetPathInfoOptional(pathId);
AFL_VERIFY(result);
return *result;
}

const THashMap<ui64, TPathInfo>& GetPathInfo() const {
return PathInfo;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {

// insert, not commited
auto userData1 = std::make_shared<TUserData>(tableId, TBlobRange(blobId1), TLocalHelper::GetMetaProto(), indexSnapshot, std::nullopt);
insertTable.RegisterPathInfo(tableId);
bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, userData1));
UNIT_ASSERT(ok);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class TWriteAggregation {
YDB_READONLY(ui64, Size, 0);
YDB_READONLY(ui64, Rows, 0);
YDB_ACCESSOR_DEF(std::vector<TWideSerializedBatch>, SplittedBlobs);
YDB_READONLY_DEF(TVector<TInsertWriteId>, InsertWriteIds);
YDB_READONLY_DEF(std::vector<TInsertWriteId>, InsertWriteIds);
YDB_READONLY_DEF(std::shared_ptr<NOlap::IBlobsWritingAction>, BlobsAction);
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);
std::shared_ptr<arrow::RecordBatch> RecordBatch;
Expand Down

0 comments on commit b5341bd

Please sign in to comment.