Skip to content

Commit

Permalink
Fixed RH hash table bucket policy (ydb-platform#4341)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored and Hor911 committed May 13, 2024
1 parent b58c834 commit e79f08b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 5 deletions.
14 changes: 9 additions & 5 deletions ydb/library/yql/minikql/comp_nodes/mkql_rh_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TRobinHoodHashBase {
: HashLocal(std::move(hash))
, EqualLocal(std::move(equal))
, Capacity(initialCapacity)
, CapacityShift(64 - MostSignificantBit(initialCapacity))
, Allocator()
, SelfHash(GetSelfHash(this))
{
Expand All @@ -99,7 +100,7 @@ class TRobinHoodHashBase {
// returns iterator
Y_FORCE_INLINE char* Insert(TKey key, bool& isNew) {
auto hash = HashLocal(key);
auto ptr = MakeIterator(hash, Data, Capacity);
auto ptr = MakeIterator(hash, Data, CapacityShift);
auto ret = InsertImpl(key, hash, isNew, Data, DataEnd, ptr);
Size += isNew ? 1 : 0;
return ret;
Expand All @@ -121,7 +122,7 @@ class TRobinHoodHashBase {
for (size_t i = 0; i < batchRequest.size(); ++i) {
auto& r = batchRequest[i];
r.Hash = HashLocal(r.GetKey());
r.InitialIterator = MakeIterator(r.Hash, Data, Capacity);
r.InitialIterator = MakeIterator(r.Hash, Data, CapacityShift);
NYql::PrefetchForWrite(r.InitialIterator);
}

Expand Down Expand Up @@ -212,9 +213,9 @@ class TRobinHoodHashBase {
char* OriginalIterator;
};

Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacity) {
Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacityShift) {
// https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
ui64 bucket = (((unsigned __int128)(SelfHash ^ hash) * 11400714819323198485llu) >> 64) & (capacity - 1);
ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) >> capacityShift;
char* ptr = data + AsDeriv().GetCellSize() * bucket;
return ptr;
}
Expand Down Expand Up @@ -291,6 +292,7 @@ class TRobinHoodHashBase {
growFactor = 2;
}
auto newCapacity = Capacity * growFactor;
auto newCapacityShift = 64 - MostSignificantBit(newCapacity);
char *newData, *newDataEnd;
Allocate(newCapacity, newData, newDataEnd);
Y_DEFER {
Expand Down Expand Up @@ -319,13 +321,14 @@ class TRobinHoodHashBase {
r.Hash = HashLocal(r.GetKey());
}

r.InitialIterator = MakeIterator(r.Hash, newData, newCapacity);
r.InitialIterator = MakeIterator(r.Hash, newData, newCapacityShift);
NYql::PrefetchForWrite(r.InitialIterator);
}

CopyBatch({batch.data(), batchLen}, newData, newDataEnd);

Capacity = newCapacity;
CapacityShift = newCapacityShift;
std::swap(Data, newData);
std::swap(DataEnd, newDataEnd);
}
Expand Down Expand Up @@ -378,6 +381,7 @@ class TRobinHoodHashBase {
private:
ui64 Size = 0;
ui64 Capacity;
ui64 CapacityShift;
TAllocator Allocator;
const ui64 SelfHash;
char* Data = nullptr;
Expand Down
82 changes: 82 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,88 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) {
Cerr << "maxDistance: " << maxDistance << "\n";
UNIT_ASSERT(maxDistance < 10);
}

Y_UNIT_TEST(RehashCollisions) {
TRobinHoodHashSet<ui64> rh;
TVector<ui64> values;
const ui64 N = 1500000;
values.reserve(N);
for (ui64 i = 1; i <= N; ++i) {
auto k = 64 * (i >> 4) + ((i & 8) ? 32 : 0) + (i & 7);
values.push_back(k);
}
/*
for (ui64 i = 0; i < 32; ++i) {
Cerr << values[i] << "\n";
}
for (ui64 i = 0; i < 32; ++i) {
Cerr << values[values.size() - 32 + i] << "\n";
}*/

for (ui64 i = 0; i < values.size(); ++i) {
auto k = values[i];
bool isNew;
auto iter = rh.Insert(k, isNew);
if (rh.GetKey(iter) != k) {
UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
}

if (isNew) {
rh.CheckGrow();
}

if (i + 1 != rh.GetSize()) {
UNIT_ASSERT_VALUES_EQUAL(i + 1, rh.GetSize());
}
}

TRobinHoodHashSet<ui64> rh2;

i32 maxDistance1 = 0;
ui64 j = 0;
for (auto it = rh.Begin(); it != rh.End(); rh.Advance(it)) {
if (!rh.IsValid(it)) {
continue;
}

auto distance = rh.GetPSL(it).Distance;
maxDistance1 = Max(maxDistance1, distance);

auto k = rh.GetKey(it);
bool isNew;
auto iter = rh2.Insert(k, isNew);
if (rh2.GetKey(iter) != k) {
UNIT_ASSERT_VALUES_EQUAL(rh2.GetKey(iter), k);
}

if (isNew) {
rh2.CheckGrow();
}

if (j + 1 != rh2.GetSize()) {
UNIT_ASSERT_VALUES_EQUAL(j + 1, rh2.GetSize());
}

++j;
}

Cerr << "maxDistance1: " << maxDistance1 << "\n";
UNIT_ASSERT(maxDistance1 < 20);

i32 maxDistance2 = 0;
for (auto it = rh2.Begin(); it != rh2.End(); rh2.Advance(it)) {
if (!rh2.IsValid(it)) {
continue;
}

auto distance = rh2.GetPSL(it).Distance;
maxDistance2 = Max(maxDistance2, distance);
}

Cerr << "maxDistance2: " << maxDistance2 << "\n";
UNIT_ASSERT(maxDistance2 < 20);
}
}

}
Expand Down

0 comments on commit e79f08b

Please sign in to comment.