Skip to content

Commit

Permalink
Request DataShard compaction if scheme has been changeed (#11147)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Nov 12, 2024
1 parent 91cc642 commit edeb17d
Show file tree
Hide file tree
Showing 25 changed files with 527 additions and 149 deletions.
4 changes: 4 additions & 0 deletions ydb/core/protos/table_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ message TTableStats {
optional TStoragePoolsStats StoragePools = 31;

optional uint64 ByKeyFilterSize = 32;

// denotes that datashard should be background compacted
// even if it is single parted
optional bool HasSchemaChanges = 33;
}
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_comp.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ namespace NTable {
/**
* Returns row schema of the specified table
*/
virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) = 0;
virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) const = 0;

/**
* Returns schema of the specified table
Expand Down
100 changes: 83 additions & 17 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4293,7 +4293,7 @@ const NTable::TScheme& TExecutor::DatabaseScheme()
return Scheme();
}

TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table)
TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table) const
{
return Database->GetRowScheme(table);
}
Expand Down Expand Up @@ -4334,6 +4334,80 @@ const NTable::TRowVersionRanges& TExecutor::TableRemovedRowVersions(ui32 table)
return Database->GetRemovedRowVersions(table);
}

bool TExecutor::HasSchemaChanges(ui32 table) const {
auto *tableInfo = Scheme().GetTableInfo(table);
auto rowScheme = RowScheme(table);
if (!tableInfo || !rowScheme) {
return false;
}

auto subset = Database->Subset(table, NTable::TEpoch::Max(), { } , { });
for (const auto& partView : subset->Flatten) {
if (HasSchemaChanges(partView, *tableInfo, *rowScheme)) {
return true;
}
}

return false;
}

bool TExecutor::HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const {
if (partView.Part->Stat.Rows == 0) {
return false;
}

{ // Check by key filter existence
bool partByKeyFilter = bool(partView->ByKey);
bool schemeByKeyFilter = tableInfo.ByKeyFilter;
if (partByKeyFilter != schemeByKeyFilter) {
return true;
}
}

{ // Check B-Tree index existence
if (AppData()->FeatureFlags.GetEnableLocalDBBtreeIndex() && !partView->IndexPages.HasBTree()) {
return true;
}
}

{ // Check families
size_t partFamiliesCount = partView->GroupsCount;
size_t schemeFamiliesCount = rowScheme.Families.size();
if (partFamiliesCount != schemeFamiliesCount) {
return true;
}

for (size_t index : xrange(rowScheme.Families.size())) {
auto familyId = rowScheme.Families[index];
static const NTable::TScheme::TFamily defaultFamilySettings;
const auto& family = tableInfo.Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222

const auto* schemeGroupRoom = tableInfo.Rooms.FindPtr(family.Room);
Y_ABORT_UNLESS(schemeGroupRoom, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, tableInfo.Id);

ui32 partGroupChannel = partView.Part->GetGroupChannel(NTable::NPage::TGroupId(index));
if (partGroupChannel != schemeGroupRoom->Main) {
return true;
}
}
}

{ // Check columns
THashMap<NTable::TTag, ui32> partColumnGroups, schemeColumnGroups;
for (const auto& column : partView->Scheme->AllColumns) {
partColumnGroups[column.Tag] = column.Group;
}
for (const auto& col : rowScheme.Cols) {
schemeColumnGroups[col.Tag] = col.Group;
}
if (partColumnGroups != schemeColumnGroups) {
return true;
}
}

return false;
}

ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
{
if (auto logl = Logger->Log(ELnLev::Info))
Expand Down Expand Up @@ -4379,37 +4453,29 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)

for (size_t group : xrange(rowScheme->Families.size())) {
auto familyId = rowScheme->Families[group];
const auto* family = tableInfo->Families.FindPtr(familyId);
if (Y_UNLIKELY(!family)) {
// FIXME: workaround for KIKIMR-17222
// Column families with default settings may be missing in schema,
// so we have to use a static variable as a substitute
static const NTable::TScheme::TFamily defaultFamilySettings;
family = &defaultFamilySettings;
}
Y_ABORT_UNLESS(family, "Cannot find family %" PRIu32 " in table %" PRIu32, familyId, table);
static const NTable::TScheme::TFamily defaultFamilySettings;
const auto& family = tableInfo->Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222

auto roomId = family->Room;
auto* room = tableInfo->Rooms.FindPtr(roomId);
Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, roomId, table);
auto* room = tableInfo->Rooms.FindPtr(family.Room);
Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, table);

auto& pageGroup = comp->Layout.Groups.at(group);
auto& writeGroup = comp->Writer.Groups.at(group);

pageGroup.Codec = family->Codec;
pageGroup.Codec = family.Codec;
pageGroup.PageSize = policy->MinDataPageSize;
pageGroup.BTreeIndexNodeTargetSize = policy->MinBTreeIndexNodeSize;
pageGroup.BTreeIndexNodeKeysMin = policy->MinBTreeIndexNodeKeys;

writeGroup.Cache = Max(family->Cache, cache);
writeGroup.Cache = Max(family.Cache, cache);
writeGroup.MaxBlobSize = NBlockIO::BlockSize;
writeGroup.Channel = room->Main;
addChannel(room->Main);

if (group == 0) {
// Small/Large edges are taken from the leader family
comp->Layout.SmallEdge = family->Small;
comp->Layout.LargeEdge = family->Large;
comp->Layout.SmallEdge = family.Small;
comp->Layout.LargeEdge = family.Large;

// Small/Large channels are taken from the leader family
comp->Writer.BlobsChannels = room->Blobs;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ class TExecutor

ui64 OwnerTabletId() const override;
const NTable::TScheme& DatabaseScheme() override;
TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) override;
TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) const override;
const NTable::TScheme::TTableInfo* TableScheme(ui32 table) override;
ui64 TableMemSize(ui32 table, NTable::TEpoch epoch) override;
NTable::TPartView TablePart(ui32 table, const TLogoBlobID& label) override;
Expand Down Expand Up @@ -652,6 +652,8 @@ class TExecutor
bool CancelScan(ui32 tableId, ui64 taskId) override;

TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const override;
bool HasSchemaChanges(ui32 table) const override;
bool HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const;
ui64 CompactBorrowed(ui32 tableId) override;
ui64 CompactMemTable(ui32 tableId) override;
ui64 CompactTable(ui32 tableId) override;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tablet_flat/flat_stat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ bool BuildStats(const TSubset& subset, TStats& stats, ui64 rowCountResolution, u
}

void GetPartOwners(const TSubset& subset, THashSet<ui64>& partOwners) {
for (auto& pi : subset.Flatten) {
partOwners.insert(pi->Label.TabletID());
for (const auto& partView : subset.Flatten) {
partOwners.insert(partView->Label.TabletID());
}
for (auto& pi : subset.ColdParts) {
partOwners.insert(pi->Label.TabletID());
for (const auto& coldPart : subset.ColdParts) {
partOwners.insert(coldPart->Label.TabletID());
}
for (auto& pi : subset.TxStatus) {
partOwners.insert(pi->Label.TabletID());
for (const auto& txStatus : subset.TxStatus) {
partOwners.insert(txStatus->Label.TabletID());
}
}

Expand Down
13 changes: 6 additions & 7 deletions ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1398,13 +1398,12 @@ bool TTable::RemoveRowVersions(const TRowVersion& lower, const TRowVersion& uppe

TCompactionStats TTable::GetCompactionStats() const
{
TCompactionStats stats;
stats.MemRowCount = GetMemRowCount();
stats.MemDataSize = GetMemSize();
stats.MemDataWaste = GetMemWaste();
stats.PartCount = Flatten.size() + ColdParts.size();

return stats;
return {
.PartCount = Flatten.size() + ColdParts.size(),
.MemRowCount = GetMemRowCount(),
.MemDataSize = GetMemSize(),
.MemDataWaste = GetMemWaste(),
};
}

void TTable::SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ namespace NFlatExecutorSetup {

// edge and ts of last full compaction
virtual TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const = 0;
virtual bool HasSchemaChanges(ui32 table) const = 0;

// Forces full compaction of the specified table in the near future
// Returns 0 if can't compact, otherwise compaction ID
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/ut/flat_comp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TSimpleBackend : public ICompactionBackend {
return DB.GetScheme();
}

TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) override {
TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) const override {
return DB.GetRowScheme(table);
}

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
, CanCancelROWithReadSets(0, 0, 1)
, PerShardReadSizeLimit(5368709120, 0, 107374182400)
, CpuUsageReportThreshlodPercent(60, -1, 146)
, CpuUsageReportThresholdPercent(60, -1, 146)
, CpuUsageReportIntervalSeconds(60, 0, 365*86400)
, HighDataSizeReportThreshlodBytes(10ull<<30, -1, Max<i64>())
, HighDataSizeReportThresholdBytes(10ull<<30, -1, Max<i64>())
, HighDataSizeReportIntervalSeconds(60, 0, 365*86400)
, DataTxProfileLogThresholdMs(0, 0, 86400000)
, DataTxProfileBufferThresholdMs(0, 0, 86400000)
Expand Down Expand Up @@ -308,9 +308,9 @@ void TDataShard::IcbRegister() {

appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
appData->Icb->RegisterSharedControl(CpuUsageReportThresholdPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportThresholdBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");

appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard__compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class TDataShard::TTxCompactTable : public NTabletFlatExecutor::TTransactionBase
auto stats = txc.DB.GetCompactionStats(localTid);
bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;
bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;
if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) {
bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(tableInfo.LocalTid);
if (isEmpty || isSingleParted && !hasBorrowed && !hasSchemaChanges && !record.GetCompactSinglePartedShards()) {
// nothing to compact
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Background compaction of tablet# " << Self->TabletID()
Expand Down
Loading

0 comments on commit edeb17d

Please sign in to comment.