diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index e802c090abe2..4f92cdc7b039 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -350,6 +350,16 @@ inline uint64_t mixNormalizedKey(uint64_t k, uint8_t bits) { auto h = (k ^ ((k >> 32))) * prime1; return h + (h >> bits) * prime2 + (h >> (2 * bits)) * prime3; } + +void populateNormalizedKeys(HashLookup& lookup, int8_t sizeBits) { + lookup.normalizedKeys.resize(lookup.rows.back() + 1); + auto hashes = lookup.hashes.data(); + for (auto row : lookup.rows) { + auto hash = hashes[row]; + lookup.normalizedKeys[row] = hash; // NOLINT + hashes[row] = mixNormalizedKey(hash, sizeBits); + } +} } // namespace template @@ -359,20 +369,10 @@ void HashTable::groupProbe(HashLookup& lookup) { return; } // Do size-based rehash before mixing hashes from normalized keys - // because The - // size of the table affects the mixing. + // because the size of the table affects the mixing. checkSize(lookup.rows.size()); if (hashMode_ == HashMode::kNormalizedKey) { - auto numRows = lookup.rows.size(); - lookup.normalizedKeys.resize(numRows); - auto rows = lookup.rows.data(); - for (int i = 0; i < numRows; ++i) { - auto row = rows[i]; - auto hashes = lookup.hashes.data(); - auto hash = hashes[row]; - lookup.normalizedKeys[row] = hash; // NOLINT - hashes[row] = mixNormalizedKey(hash, sizeBits_); - } + populateNormalizedKeys(lookup, sizeBits_); } ProbeState state1; ProbeState state2; @@ -410,12 +410,12 @@ void HashTable::groupProbe(HashLookup& lookup) { template void HashTable::arrayGroupProbe(HashLookup& lookup) { + VELOX_DCHECK(!lookup.hashes.empty()); + VELOX_DCHECK(!lookup.hits.empty()); + int32_t numProbes = lookup.rows.size(); const vector_size_t* rows = lookup.rows.data(); - assert(!lookup.hashes.empty()); - assert(!lookup.hits.empty()); auto hashes = lookup.hashes.data(); - assert(!lookup.hits.empty()); auto groups = lookup.hits.data(); int32_t i = 0; if (process::hasAvx2() && simd::isDense(rows, numProbes)) { @@ -471,13 +471,7 @@ void HashTable::joinProbe(HashLookup& lookup) { return; } if (hashMode_ == HashMode::kNormalizedKey) { - lookup.normalizedKeys.resize(lookup.rows.back() + 1); - auto hashes = lookup.hashes.data(); - for (auto row : lookup.rows) { - auto hash = hashes[row]; - lookup.normalizedKeys[row] = hash; // NOLINT - hashes[row] = mixNormalizedKey(hash, sizeBits_); - } + populateNormalizedKeys(lookup, sizeBits_); } int32_t probeIndex = 0; int32_t numProbes = lookup.rows.size(); @@ -619,8 +613,8 @@ void HashTable::insertForGroupBy( if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; - VELOX_CHECK(index < size_); - VELOX_CHECK(table_[index] == nullptr); + VELOX_CHECK_LT(index, size_); + VELOX_CHECK_NULL(table_[index]); table_[index] = groups[i]; } } else { @@ -746,7 +740,7 @@ void HashTable::insertForJoin( if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; - VELOX_CHECK(index < size_); + VELOX_CHECK_LT(index, size_); arrayPushRow(groups[i], index); } return; diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index 72ce69a0d9c9..f372886ead0d 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -149,27 +149,38 @@ class HashTableTest : public testing::Test { const RowVector& input, HashLookup& lookup, HashTable& table) { + const SelectivityVector rows(input.size()); + insertGroups(input, rows, lookup, table); + } + + void insertGroups( + const RowVector& input, + const SelectivityVector& rows, + HashLookup& lookup, + HashTable& table) { + lookup.reset(rows.end()); + lookup.rows.clear(); + rows.applyToSelected([&](auto row) { lookup.rows.push_back(row); }); + auto& hashers = table.hashers(); - SelectivityVector activeRows(input.size()); auto mode = table.hashMode(); bool rehash = false; for (int32_t i = 0; i < hashers.size(); ++i) { auto key = input.childAt(hashers[i]->channel()); if (mode != BaseHashTable::HashMode::kHash) { - if (!hashers[i]->computeValueIds(*key, activeRows, lookup.hashes)) { + if (!hashers[i]->computeValueIds(*key, rows, lookup.hashes)) { rehash = true; } } else { - hashers[i]->hash(*key, activeRows, i > 0, lookup.hashes); + hashers[i]->hash(*key, rows, i > 0, lookup.hashes); } } - std::iota(lookup.rows.begin(), lookup.rows.end(), 0); if (rehash) { if (table.hashMode() != BaseHashTable::HashMode::kHash) { table.decideHashMode(input.size()); } - insertGroups(input, lookup, table); + insertGroups(input, rows, lookup, table); return; } table.groupProbe(lookup); @@ -538,3 +549,26 @@ TEST_F(HashTableTest, enableRangeWhereCan) { lookup->reset(data->size()); insertGroups(*data, *lookup, *table); } + +TEST_F(HashTableTest, arrayProbeNormalizedKey) { + auto table = createHashTableForAggregation(ROW({"a"}, {BIGINT()}), 1); + auto lookup = std::make_unique(table->hashers()); + + for (auto i = 0; i < 200; ++i) { + auto data = vectorMaker_->rowVector({ + vectorMaker_->flatVector( + 10'000, [&](auto row) { return i * 10'000 + row; }), + }); + + SelectivityVector rows(5'000); + insertGroups(*data, rows, *lookup, *table); + + rows.resize(10'000); + rows.clearAll(); + rows.setValidRange(5'000, 10'000, true); + rows.updateBounds(); + insertGroups(*data, rows, *lookup, *table); + } + + ASSERT_TRUE(table->hashMode() == BaseHashTable::HashMode::kNormalizedKey); +}