Skip to content

Commit

Permalink
Fix bugs in global kmeans
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Jan 27, 2025
1 parent d050a4a commit 8a31f17
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 31 deletions.
8 changes: 5 additions & 3 deletions ydb/core/tx/datashard/kmeans_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ struct TMaxInnerProductSimilarity: TMetric<T> {

template <typename TMetric>
struct TCalculation: TMetric {
ui32 FindClosest(std::span<const TString> clusters, const char* embedding) const
ui32 FindClosest(std::span<const TString> clusters, TArrayRef<const char> embedding) const
{
Y_DEBUG_ABORT_UNLESS(this->IsExpectedSize(embedding));
auto min = this->Init();
ui32 closest = std::numeric_limits<ui32>::max();
for (size_t i = 0; const auto& cluster : clusters) {
auto distance = this->Distance(cluster.data(), embedding);
Y_DEBUG_ABORT_UNLESS(this->IsExpectedSize(cluster));
auto distance = this->Distance(cluster.data(), embedding.data());
if (distance < min) {
min = distance;
closest = i;
Expand All @@ -195,7 +197,7 @@ ui32 FeedEmbedding(const TCalculation<TMetric>& calculation, std::span<const TSt
if (!calculation.IsExpectedSize(embedding)) {
return std::numeric_limits<ui32>::max();
}
return calculation.FindClosest(clusters, embedding.data());
return calculation.FindClosest(clusters, embedding);
}

void AddRowMain2Build(TBufferData& buffer, ui32 parent, TArrayRef<const TCell> key, const NTable::TRowState& row);
Expand Down
42 changes: 19 additions & 23 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,16 @@ static constexpr const char* Name(TIndexBuildInfo::EState state) noexcept {
static std::tuple<ui32, ui32, ui32> ComputeKMeansBoundaries(const NSchemeShard::TTableInfo& tableInfo, const TIndexBuildInfo& buildInfo) {
const auto& kmeans = buildInfo.KMeans;
Y_ASSERT(kmeans.K != 0);
Y_ASSERT((kmeans.K & (kmeans.K - 1)) == 0);
const auto count = TIndexBuildInfo::TKMeans::BinPow(kmeans.K, kmeans.Level);
ui32 step = 1;
auto parts = count;
auto shards = tableInfo.GetShard2PartitionIdx().size();
if (!buildInfo.KMeans.NeedsAnotherLevel() || shards <= 1) {
shards = 1;
parts = 1;
if (!buildInfo.KMeans.NeedsAnotherLevel() || count <= 1 || shards <= 1) {
return {1, 1, 1};
}
for (; shards < parts; parts /= 2) {
for (; 2 * shards <= parts; parts = (parts + 1) / 2) {
step *= 2;
}
for (; parts < shards / 2; parts *= 2) {
Y_ASSERT(step == 1);
}
return {count, parts, step};
}

Expand Down Expand Up @@ -341,7 +336,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(
modifyScheme.SetWorkingDir(path.Dive(buildInfo.IndexName).PathString());
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpInitiateBuildIndexImplTable);
auto& op = *modifyScheme.MutableCreateTable();
const char* suffix = buildInfo.KMeans.Level % 2 != 0 ? BuildSuffix0 : BuildSuffix1;
std::string_view suffix = buildInfo.KMeans.Level % 2 != 0 ? BuildSuffix0 : BuildSuffix1;
op = CalcVectorKmeansTreePostingImplTableDesc(tableInfo, tableInfo->PartitionConfig(), implTableColumns, {}, suffix);

const auto [count, parts, step] = ComputeKMeansBoundaries(*tableInfo, buildInfo);
Expand All @@ -351,25 +346,24 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(

auto& policy = *config.MutablePartitioningPolicy();
policy.SetSizeToSplit(0); // disable auto split/merge
policy.SetMinPartitionsCount(parts);
policy.SetMaxPartitionsCount(parts);
policy.ClearFastSplitSettings();
policy.ClearSplitByLoadSettings();

op.ClearSplitBoundary();
if (parts <= 1) {
return propose;
}
auto i = buildInfo.KMeans.Parent;
for (const auto end = i + count;;) {
i += step;
if (i >= end) {
Y_ASSERT(op.SplitBoundarySize() == std::min(count, parts) - 1);
break;
static constexpr std::string_view LogPrefix = "Create build table boundaries for ";
LOG_D(buildInfo.Id << " table " << suffix
<< ", count: " << count << ", parts: " << parts << ", step: " << step
<< ", kmeans: " << buildInfo.KMeansTreeToDebugStr());
if (parts > 1) {
const auto parentFrom = buildInfo.KMeans.ParentEnd + 1;
for (auto i = parentFrom + step, e = parentFrom + count; i < e; i += step) {
LOG_D(buildInfo.Id << " table " << suffix << " value: " << i);
auto cell = TCell::Make(i);
op.AddSplitBoundary()->SetSerializedKeyPrefix(TSerializedCellVec::Serialize({&cell, 1}));
}
auto cell = TCell::Make(i);
op.AddSplitBoundary()->SetSerializedKeyPrefix(TSerializedCellVec::Serialize({&cell, 1}));
}
policy.SetMinPartitionsCount(op.SplitBoundarySize() + 1);
policy.SetMaxPartitionsCount(op.SplitBoundarySize() + 1);
return propose;
}

Expand Down Expand Up @@ -574,7 +568,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
auto& clusters = *ev->Record.MutableClusters();
clusters.Reserve(buildInfo.Sample.Rows.size());
for (const auto& [_, row] : buildInfo.Sample.Rows) {
*clusters.Add() = row;
const auto cluster = std::string_view{row}.substr(sizeof(ui16) + sizeof(ui32));
Y_DEBUG_ABORT_UNLESS(cluster == TSerializedCellVec{row}.GetCells().at(0).AsBuf());
*clusters.Add() = cluster;
}

ev->Record.SetPostingName(path.Dive(buildInfo.KMeans.WriteTo()).PathString());
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3672,17 +3672,32 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
}

float CalcProgressPercent() const {
const auto total = Shards.size();
const auto done = DoneShards.size();
if (IsBuildVectorIndex()) {
const auto inProgress = InProgressShards.size();
const auto toUpload = ToUploadShards.size();
Y_ASSERT(KMeans.Level != 0);
// TODO(mbkkt) better calculation for vector index
return KMeans.Level * 100.0 / KMeans.Levels;
if (!KMeans.NeedsAnotherLevel() && !KMeans.NeedsAnotherParent()
&& toUpload == 0 && inProgress == 0) {
return 100.f;
}
auto percent = static_cast<float>(KMeans.Level - 1) / KMeans.Levels;
auto multiply = 1.f / KMeans.Levels;
if (KMeans.State == TKMeans::MultiLocal) {
percent += (multiply * (total - inProgress - toUpload)) / total;
} else {
const auto parentSize = KMeans.BinPow(KMeans.K, KMeans.Level - 1);
const auto parentFrom = KMeans.ParentEnd - KMeans.BinPow(KMeans.K, KMeans.Level - 1) + 1;
percent += (multiply * (KMeans.Parent - parentFrom)) / parentSize;
}
return 100.f * percent;
}
if (Shards) {
float totalShards = Shards.size();
return 100.0 * DoneShards.size() / totalShards;
return (100.f * done) / total;
}
// No shards - no progress
return 0.0;
return 0.f;
}

void SerializeToProto(TSchemeShard* ss, NKikimrIndexBuilder::TColumnBuildSettings* to) const;
Expand Down

0 comments on commit 8a31f17

Please sign in to comment.